This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch cs_opt_version-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/cs_opt_version-3.1 by this 
push:
     new d30d429847b support native format (#58878)
d30d429847b is described below

commit d30d429847b64009d8f0104d6cb79856670f9974
Author: lihangyu <[email protected]>
AuthorDate: Tue Dec 9 22:16:38 2025 +0800

    support native format (#58878)
---
 be/src/service/internal_service.cpp                |   9 +
 be/src/vec/exec/format/native/native_reader.cpp    | 351 +++++++++++++
 be/src/vec/exec/format/native/native_reader.h      | 121 +++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |  18 +-
 be/src/vec/functions/function_cast.h               |  13 +-
 be/src/vec/runtime/vnative_transformer.cpp         | 121 +++++
 be/src/vec/runtime/vnative_transformer.h           |  65 +++
 be/src/vec/sink/writer/vfile_result_writer.cpp     |   9 +
 .../format/native/native_reader_writer_test.cpp    | 564 +++++++++++++++++++++
 .../doris/common/util/FileFormatConstants.java     |   1 +
 .../java/org/apache/doris/common/util/Util.java    |   3 +
 .../property/fileformat/FileFormatProperties.java  |   3 +
 .../fileformat/NativeFileFormatProperties.java     |  65 +++
 .../apache/doris/load/LoadExprTransformUtils.java  |   2 +-
 .../ExternalFileTableValuedFunction.java           |  13 +-
 gensrc/thrift/PlanNodes.thrift                     |   3 +-
 .../outfile/native/test_outfile_native.out         |  25 +
 .../outfile/native/test_outfile_native.groovy      | 100 ++++
 .../test_export_variant_10k_columns.groovy         | 238 +++++++++
 19 files changed, 1710 insertions(+), 14 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 15ddf30ce53..f962f47394e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -134,6 +134,7 @@
 #include "vec/exec/format/csv/csv_reader.h"
 #include "vec/exec/format/generic_reader.h"
 #include "vec/exec/format/json/new_json_reader.h"
+#include "vec/exec/format/native/native_reader.h"
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/exec/format/text/text_reader.h"
@@ -855,6 +856,14 @@ void 
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
             reader = vectorized::OrcReader::create_unique(params, range, "", 
&io_ctx);
             break;
         }
+        case TFileFormatType::FORMAT_NATIVE: {
+            // Doris Native binary format reader for schema probing in S3/FILE 
TVF.
+            // Use a reasonable default batch size; it only affects internal 
buffering.
+            size_t batch_size = 4096;
+            reader = vectorized::NativeReader::create_unique(profile.get(), 
params, range,
+                                                             batch_size, 
&io_ctx, nullptr);
+            break;
+        }
         case TFileFormatType::FORMAT_JSON: {
             reader = vectorized::NewJsonReader::create_unique(profile.get(), 
params, range,
                                                               file_slots, 
&io_ctx);
diff --git a/be/src/vec/exec/format/native/native_reader.cpp 
b/be/src/vec/exec/format/native/native_reader.cpp
new file mode 100644
index 00000000000..e01c78890eb
--- /dev/null
+++ b/be/src/vec/exec/format/native/native_reader.cpp
@@ -0,0 +1,351 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/native/native_reader.h"
+
+#include <gen_cpp/data.pb.h>
+
+#include "io/file_factory.h"
+#include "io/fs/buffered_reader.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/tracing_file_reader.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+NativeReader::NativeReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
+                           const TFileRangeDesc& range, size_t /*batch_size*/,
+                           io::IOContext* io_ctx, RuntimeState* state)
+        : _profile(profile),
+          _scan_params(params),
+          _scan_range(range),
+          _io_ctx(io_ctx),
+          _state(state) {}
+
+NativeReader::~NativeReader() {
+    (void)close();
+}
+
+Status NativeReader::init_reader() {
+    if (_file_reader != nullptr) {
+        return Status::OK();
+    }
+
+    // Create underlying file reader. For now we always use random access mode.
+    io::FileSystemProperties system_properties;
+    io::FileDescription file_description;
+    if (_scan_range.__isset.file_size) {
+        file_description.file_size = _scan_range.file_size;
+    } else {
+        file_description.file_size = -1;
+    }
+    file_description.path = _scan_range.path;
+    if (_scan_range.__isset.fs_name) {
+        file_description.fs_name = _scan_range.fs_name;
+    }
+    if (_scan_range.__isset.modification_time) {
+        file_description.mtime = _scan_range.modification_time;
+    } else {
+        file_description.mtime = 0;
+    }
+
+    if (_scan_range.__isset.file_type) {
+        // For compatibility with older FE.
+        system_properties.system_type = _scan_range.file_type;
+    } else {
+        system_properties.system_type = _scan_params.file_type;
+    }
+    system_properties.properties = _scan_params.properties;
+    system_properties.hdfs_params = _scan_params.hdfs_params;
+    if (_scan_params.__isset.broker_addresses) {
+        
system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(),
+                                                  
_scan_params.broker_addresses.end());
+    }
+
+    io::FileReaderOptions reader_options =
+            FileFactory::get_reader_options(_state, file_description);
+    auto reader_res = io::DelegateReader::create_file_reader(
+            _profile, system_properties, file_description, reader_options,
+            io::DelegateReader::AccessMode::RANDOM, _io_ctx);
+    if (!reader_res.has_value()) {
+        return reader_res.error();
+    }
+    _file_reader = reader_res.value();
+
+    if (_io_ctx) {
+        _file_reader =
+                std::make_shared<io::TracingFileReader>(_file_reader, 
_io_ctx->file_reader_stats);
+    }
+
+    _file_size = _file_reader->size();
+    _current_offset = 0;
+    _eof = (_file_size == 0);
+
+    // Detect optional Doris Native file header written by VNativeTransformer.
+    // New files have layout:
+    // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t 
block_size]...
+    if (!_eof && _file_size >= 12) {
+        char header[12];
+        Slice header_slice(header, sizeof(header));
+        size_t bytes_read = 0;
+        RETURN_IF_ERROR(_file_reader->read_at(0, header_slice, &bytes_read));
+        if (bytes_read == sizeof(header)) {
+            static constexpr char NATIVE_MAGIC[8] = {'D', 'O', 'R', 'I', 'S', 
'N', '1', '\0'};
+            if (memcmp(header, NATIVE_MAGIC, sizeof(NATIVE_MAGIC)) == 0) {
+                // We currently only have format version 1, but keep the value 
for future use.
+                uint32_t version = 0;
+                memcpy(&version, header + sizeof(NATIVE_MAGIC), 
sizeof(uint32_t));
+                (void)version;
+                _current_offset = sizeof(header);
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status NativeReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
+    if (_eof) {
+        *read_rows = 0;
+        *eof = true;
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(init_reader());
+
+    std::string buff;
+    bool local_eof = false;
+
+    // If we have already loaded the first block for schema probing, use it 
first.
+    if (_first_block_loaded && !_first_block_consumed) {
+        buff = _first_block_buf;
+        local_eof = false;
+    } else {
+        RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof));
+    }
+
+    // If we reach EOF and also read no data for this call, the whole file is 
considered finished.
+    if (local_eof && buff.empty()) {
+        *read_rows = 0;
+        *eof = true;
+        _eof = true;
+        return Status::OK();
+    }
+    // If buffer is empty but we have not reached EOF yet, treat this as an 
error.
+    if (buff.empty()) {
+        return Status::InternalError("read empty native block from file {}", 
_scan_range.path);
+    }
+
+    PBlock pblock;
+    if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) {
+        return Status::InternalError("Failed to parse native PBlock from file 
{}",
+                                     _scan_range.path);
+    }
+
+    // Initialize schema from first block if not done yet.
+    if (!_schema_inited) {
+        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+    }
+
+    // deserialize PBlock into Block. Older Block API only exposes a 
single-argument
+    // deserialize method without decompression statistics.
+    RETURN_IF_ERROR(block->deserialize(pblock));
+
+    // For external file scan / TVF scenarios, unify all columns as nullable 
to match
+    // GenericReader/SlotDescriptor convention. This ensures schema 
consistency when
+    // some writers emit non-nullable columns.
+    for (size_t i = 0; i < block->columns(); ++i) {
+        auto& col_with_type = block->get_by_position(i);
+        if (!col_with_type.type->is_nullable()) {
+            col_with_type.column = make_nullable(col_with_type.column);
+            col_with_type.type = make_nullable(col_with_type.type);
+        }
+    }
+
+    *read_rows = block->rows();
+    *eof = false;
+
+    if (_first_block_loaded && !_first_block_consumed) {
+        _first_block_consumed = true;
+    }
+
+    // If we reached the physical end of file, mark eof for subsequent calls.
+    if (_current_offset >= _file_size) {
+        _eof = true;
+    }
+
+    return Status::OK();
+}
+
+Status NativeReader::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
+                                 std::unordered_set<std::string>* 
missing_cols) {
+    missing_cols->clear();
+    RETURN_IF_ERROR(init_reader());
+
+    if (!_schema_inited) {
+        // Load first block lazily to initialize schema.
+        if (!_first_block_loaded) {
+            bool local_eof = false;
+            RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
+            // Treat file as empty only if we reach EOF and there is no block 
data at all.
+            if (local_eof && _first_block_buf.empty()) {
+                return Status::EndOfFile("empty native file {}", 
_scan_range.path);
+            }
+            // Non-EOF but empty buffer means corrupted native file.
+            if (_first_block_buf.empty()) {
+                return Status::InternalError("first native block is empty {}", 
_scan_range.path);
+            }
+            _first_block_loaded = true;
+        }
+
+        PBlock pblock;
+        if (!pblock.ParseFromArray(_first_block_buf.data(),
+                                   static_cast<int>(_first_block_buf.size()))) 
{
+            return Status::InternalError("Failed to parse native PBlock for 
schema from file {}",
+                                         _scan_range.path);
+        }
+        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+    }
+
+    for (size_t i = 0; i < _schema_col_names.size(); ++i) {
+        // convert internal DataTypePtr to TypeDescriptor for external schema
+        name_to_type->emplace(_schema_col_names[i],
+                              
_schema_col_types[i]->get_type_as_type_descriptor());
+    }
+    return Status::OK();
+}
+
+Status NativeReader::init_schema_reader() {
+    RETURN_IF_ERROR(init_reader());
+    return Status::OK();
+}
+
+Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names,
+                                       std::vector<TypeDescriptor>* col_types) 
{
+    RETURN_IF_ERROR(init_reader());
+
+    if (!_schema_inited) {
+        if (!_first_block_loaded) {
+            bool local_eof = false;
+            RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
+            // Treat file as empty only if we reach EOF and there is no block 
data at all.
+            if (local_eof && _first_block_buf.empty()) {
+                return Status::EndOfFile("empty native file {}", 
_scan_range.path);
+            }
+            // Non-EOF but empty buffer means corrupted native file.
+            if (_first_block_buf.empty()) {
+                return Status::InternalError("first native block is empty {}", 
_scan_range.path);
+            }
+            _first_block_loaded = true;
+        }
+
+        PBlock pblock;
+        if (!pblock.ParseFromArray(_first_block_buf.data(),
+                                   static_cast<int>(_first_block_buf.size()))) 
{
+            return Status::InternalError("Failed to parse native PBlock for 
schema from file {}",
+                                         _scan_range.path);
+        }
+        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+    }
+
+    *col_names = _schema_col_names;
+    col_types->clear();
+    col_types->reserve(_schema_col_types.size());
+    for (const auto& dt : _schema_col_types) {
+        col_types->emplace_back(dt->get_type_as_type_descriptor());
+    }
+    return Status::OK();
+}
+
+Status NativeReader::close() {
+    _file_reader.reset();
+    return Status::OK();
+}
+
+Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) {
+    *eof = false;
+    buff->clear();
+
+    if (_file_reader == nullptr) {
+        RETURN_IF_ERROR(init_reader());
+    }
+
+    if (_current_offset >= _file_size) {
+        *eof = true;
+        return Status::OK();
+    }
+
+    uint64_t len = 0;
+    Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice, 
&bytes_read));
+    if (bytes_read == 0) {
+        *eof = true;
+        return Status::OK();
+    }
+    if (bytes_read != sizeof(len)) {
+        return Status::InternalError(
+                "Failed to read native block length from file {}, expect {}, "
+                "actual {}",
+                _scan_range.path, sizeof(len), bytes_read);
+    }
+
+    _current_offset += sizeof(len);
+    if (len == 0) {
+        // Empty block, nothing to read.
+        *eof = (_current_offset >= _file_size);
+        return Status::OK();
+    }
+
+    buff->assign(len, '\0');
+    Slice data_slice(buff->data(), len);
+    bytes_read = 0;
+    RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, 
&bytes_read));
+    if (bytes_read != len) {
+        return Status::InternalError(
+                "Failed to read native block body from file {}, expect {}, "
+                "actual {}",
+                _scan_range.path, len, bytes_read);
+    }
+
+    _current_offset += len;
+    *eof = (_current_offset >= _file_size);
+    return Status::OK();
+}
+
+Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) {
+    _schema_col_names.clear();
+    _schema_col_types.clear();
+
+    for (const auto& pcol_meta : pblock.column_metas()) {
+        DataTypePtr type = 
make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta));
+        LOG(INFO) << "init_schema_from_pblock, name=" << pcol_meta.name()
+                  << ", type=" << type->get_name();
+        _schema_col_names.emplace_back(pcol_meta.name());
+        _schema_col_types.emplace_back(type);
+    }
+    _schema_inited = true;
+    return Status::OK();
+}
+
+#include "common/compile_check_end.h"
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/native/native_reader.h 
b/be/src/vec/exec/format/native/native_reader.h
new file mode 100644
index 00000000000..6f62cff9f09
--- /dev/null
+++ b/be/src/vec/exec/format/native/native_reader.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "common/status.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+struct TypeDescriptor;
+class PBlock;
+
+namespace io {
+struct IOContext;
+} // namespace io
+} // namespace doris
+
+namespace doris::vectorized {
+class Block;
+
+#include "common/compile_check_begin.h"
+
+// Doris Native format reader.
+// it will read a sequence of Blocks encoded in Doris Native binary format.
+//
+// NOTE: current implementation is just a skeleton and will be filled step by 
step.
+class NativeReader : public GenericReader {
+public:
+    ENABLE_FACTORY_CREATOR(NativeReader);
+
+    NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+                 const TFileRangeDesc& range, size_t batch_size, 
io::IOContext* io_ctx,
+                 RuntimeState* state);
+
+    ~NativeReader() override;
+
+#ifdef BE_TEST
+    // for unit test: inject an existing FileReader and skip FileFactory path
+    void set_file_reader(io::FileReaderSPtr file_reader) {
+        _file_reader = std::move(file_reader);
+        _io_ctx = nullptr;
+        _file_size = _file_reader ? _file_reader->size() : 0;
+        _current_offset = 0;
+        _eof = (_file_size == 0);
+    }
+#endif
+
+    // Initialize underlying file reader and any format specific state.
+    Status init_reader();
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status init_schema_reader() override;
+
+    Status get_parsed_schema(std::vector<std::string>* col_names,
+                             std::vector<TypeDescriptor>* col_types) override;
+
+    Status close() override;
+
+    bool count_read_rows() override { return true; }
+
+protected:
+    void _collect_profile_before_close() override {}
+
+private:
+    RuntimeProfile* _profile = nullptr;
+    const TFileScanRangeParams& _scan_params;
+    const TFileRangeDesc& _scan_range;
+
+    io::FileReaderSPtr _file_reader;
+    io::IOContext* _io_ctx = nullptr;
+    RuntimeState* _state = nullptr;
+
+    bool _eof = false;
+
+    // Current read offset in the underlying file.
+    int64_t _current_offset = 0;
+    int64_t _file_size = 0;
+
+    // Cached schema information from the first PBlock.
+    bool _schema_inited = false;
+    std::vector<std::string> _schema_col_names;
+    std::vector<DataTypePtr> _schema_col_types;
+
+    // Cached first block (serialized) to allow schema probing before data 
scan.
+    std::string _first_block_buf;
+    bool _first_block_loaded = false;
+    bool _first_block_consumed = false;
+
+    Status _read_next_pblock(std::string* buff, bool* eof);
+    Status _init_schema_from_pblock(const PBlock& pblock);
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 54f9c807989..e46f4141f69 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -60,6 +60,7 @@
 #include "vec/exec/format/avro/avro_jni_reader.h"
 #include "vec/exec/format/csv/csv_reader.h"
 #include "vec/exec/format/json/new_json_reader.h"
+#include "vec/exec/format/native/native_reader.h"
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/exec/format/table/hive_reader.h"
@@ -597,10 +598,6 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
             // skip columns which does not exist in file
             continue;
         }
-        if (slot_desc->type().is_variant_type()) {
-            // skip variant type
-            continue;
-        }
         auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
         auto return_type = slot_desc->get_data_type_ptr();
         // remove nullable here, let the get_function decide whether nullable
@@ -612,8 +609,10 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
                 "CAST", arguments, return_type,
                 {.enable_decimal256 = runtime_state()->enable_decimal256()});
         idx = _src_block_name_to_idx[slot_desc->col_name()];
+        DCHECK(_state != nullptr);
+        auto ctx = FunctionContext::create_context(_state, {}, {});
         RETURN_IF_ERROR(
-                func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, 
arg.column->size()));
+                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, 
arg.column->size()));
         _src_block_ptr->get_by_position(idx).type = std::move(return_type);
     }
     return Status::OK();
@@ -1272,6 +1271,15 @@ Status VFileScanner::_get_next_reader() {
             init_status = 
((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
             break;
         }
+        case TFileFormatType::FORMAT_NATIVE: {
+            auto reader = NativeReader::create_unique(_profile, *_params, 
range,
+                                                      
_state->query_options().batch_size,
+                                                      _io_ctx.get(), _state);
+            init_status = reader->init_reader();
+            _cur_reader = std::move(reader);
+            need_to_get_parsed_schema = false;
+            break;
+        }
         case TFileFormatType::FORMAT_ARROW: {
             _cur_reader = ArrowStreamReader::create_unique(_state, _profile, 
&_counter, *_params,
                                                            range, 
_file_slot_descs, _io_ctx.get());
diff --git a/be/src/vec/functions/function_cast.h 
b/be/src/vec/functions/function_cast.h
index 3644abae614..a6b25f79d23 100644
--- a/be/src/vec/functions/function_cast.h
+++ b/be/src/vec/functions/function_cast.h
@@ -2124,10 +2124,10 @@ private:
     }
 
     struct ConvertImplGenericFromVariant {
-        static Status execute(const FunctionCast* fn, FunctionContext* 
context, Block& block,
+        static Status execute(const FunctionCast* fn, const DataTypePtr& 
data_type_to,
+                              FunctionContext* context, Block& block,
                               const ColumnNumbers& arguments, const size_t 
result,
                               size_t input_rows_count) {
-            auto& data_type_to = block.get_by_position(result).type;
             const auto& col_with_type_and_name = 
block.get_by_position(arguments[0]);
             auto& col_from = col_with_type_and_name.column;
             auto& variant = assert_cast<const ColumnObject&>(*col_from);
@@ -2230,10 +2230,11 @@ private:
     // create cresponding type convert from variant
     WrapperType create_variant_wrapper(const DataTypeObject& from_type,
                                        const DataTypePtr& to_type) const {
-        return [this](FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
-                      const size_t result, size_t input_rows_count) -> Status {
-            return ConvertImplGenericFromVariant::execute(this, context, 
block, arguments, result,
-                                                          input_rows_count);
+        return [this, to_type](FunctionContext* context, Block& block,
+                               const ColumnNumbers& arguments, const size_t 
result,
+                               size_t input_rows_count) -> Status {
+            return ConvertImplGenericFromVariant::execute(this, to_type, 
context, block, arguments,
+                                                          result, 
input_rows_count);
         };
     }
 
diff --git a/be/src/vec/runtime/vnative_transformer.cpp 
b/be/src/vec/runtime/vnative_transformer.cpp
new file mode 100644
index 00000000000..378f7c83196
--- /dev/null
+++ b/be/src/vec/runtime/vnative_transformer.cpp
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vnative_transformer.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/data.pb.h>
+#include <glog/logging.h>
+
+#include "agent/be_exec_version_manager.h"
+#include "io/fs/file_writer.h"
+#include "runtime/runtime_state.h"
+#include "util/slice.h"
+#include "vec/core/block.h"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+namespace {
+
+// Map high-level TFileCompressType to low-level segment_v2::CompressionTypePB.
+segment_v2::CompressionTypePB 
to_local_compression_type(TFileCompressType::type type) {
+    using CT = segment_v2::CompressionTypePB;
+    switch (type) {
+    case TFileCompressType::GZ:
+    case TFileCompressType::ZLIB:
+    case TFileCompressType::DEFLATE:
+        return CT::ZLIB;
+    case TFileCompressType::LZ4FRAME:
+    case TFileCompressType::LZ4BLOCK:
+        return CT::LZ4;
+    case TFileCompressType::SNAPPYBLOCK:
+        return CT::SNAPPY;
+    case TFileCompressType::ZSTD:
+        return CT::ZSTD;
+    default:
+        return CT::ZSTD;
+    }
+}
+
+} // namespace
+
+VNativeTransformer::VNativeTransformer(RuntimeState* state, 
doris::io::FileWriter* file_writer,
+                                       const VExprContextSPtrs& 
output_vexpr_ctxs,
+                                       bool output_object_data,
+                                       TFileCompressType::type compress_type)
+        : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+          _file_writer(file_writer),
+          _compression_type(to_local_compression_type(compress_type)) {}
+
+Status VNativeTransformer::open() {
+    // No extra initialization is required for now. The underlying FileWriter
+    // is managed by VFileResultWriter.
+    return Status::OK();
+}
+
+Status VNativeTransformer::write(const Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+
+    // Serialize Block into PBlock using existing vec serialization logic.
+    PBlock pblock;
+    size_t uncompressed_bytes = 0;
+    size_t compressed_bytes = 0;
+
+    
RETURN_IF_ERROR(block.serialize(BeExecVersionManager::get_newest_version(), 
&pblock,
+                                    &uncompressed_bytes, &compressed_bytes,
+                                    _compression_type, true));
+
+    std::string buff;
+    if (!pblock.SerializeToString(&buff)) {
+        auto err = Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
+                "serialize native block error. block rows: {}", block.rows());
+        return err;
+    }
+
+    // Layout of Doris Native file:
+    // [uint64_t block_size][PBlock bytes]...
+    uint64_t len = buff.size();
+    Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
+    RETURN_IF_ERROR(_file_writer->append(len_slice));
+    RETURN_IF_ERROR(_file_writer->append(buff));
+
+    _written_len += sizeof(len) + buff.size();
+    _cur_written_rows += block.rows();
+
+    return Status::OK();
+}
+
+Status VNativeTransformer::close() {
+    // Close underlying FileWriter to ensure data is flushed to disk.
+    if (_file_writer != nullptr && _file_writer->state() != 
doris::io::FileWriter::State::CLOSED) {
+        RETURN_IF_ERROR(_file_writer->close());
+    }
+
+    return Status::OK();
+}
+
+int64_t VNativeTransformer::written_len() {
+    return _written_len;
+}
+
+#include "common/compile_check_end.h"
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vnative_transformer.h 
b/be/src/vec/runtime/vnative_transformer.h
new file mode 100644
index 00000000000..fdf7ff46ac9
--- /dev/null
+++ b/be/src/vec/runtime/vnative_transformer.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include <cstdint>
+
+#include "common/status.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vfile_format_transformer.h"
+
+namespace doris::io {
+class FileWriter;
+} // namespace doris::io
+
+namespace doris::vectorized {
+class Block;
+
+#include "common/compile_check_begin.h"
+
+// Doris Native format writer.
+// It serializes vectorized Blocks into Doris Native binary format.
+class VNativeTransformer final : public VFileFormatTransformer {
+public:
+    // |compress_type| controls how the PBlock is compressed on disk (ZSTD, 
LZ4, etc).
+    // Defaults to ZSTD to preserve the previous behavior.
+    VNativeTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
+                       const VExprContextSPtrs& output_vexpr_ctxs, bool 
output_object_data,
+                       TFileCompressType::type compress_type = 
TFileCompressType::ZSTD);
+
+    ~VNativeTransformer() override = default;
+
+    Status open() override;
+
+    Status write(const Block& block) override;
+
+    Status close() override;
+
+    int64_t written_len() override;
+
+private:
+    doris::io::FileWriter* _file_writer; // not owned
+    int64_t _written_len = 0;
+    // Compression type used for Block::serialize (PBlock compression).
+    segment_v2::CompressionTypePB _compression_type 
{segment_v2::CompressionTypePB::ZSTD};
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 9275245acf6..74bbe355b46 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -57,6 +57,7 @@
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 #include "vec/runtime/vcsv_transformer.h"
+#include "vec/runtime/vnative_transformer.h"
 #include "vec/runtime/vorc_transformer.h"
 #include "vec/runtime/vparquet_transformer.h"
 
@@ -143,6 +144,12 @@ Status VFileResultWriter::_create_file_writer(const 
std::string& file_name) {
                 _state, _file_writer_impl.get(), _vec_output_expr_ctxs, 
_file_opts->orc_schema, {},
                 _output_object_data, _file_opts->orc_compression_type));
         break;
+    case TFileFormatType::FORMAT_NATIVE:
+        // Doris Native binary format writer with configurable compression.
+        _vfile_writer.reset(new VNativeTransformer(_state, 
_file_writer_impl.get(),
+                                                   _vec_output_expr_ctxs, 
_output_object_data,
+                                                   
_file_opts->compression_type));
+        break;
     default:
         return Status::InternalError("unsupported file format: {}", 
_file_opts->file_format);
     }
@@ -200,6 +207,8 @@ std::string VFileResultWriter::_file_format_to_name() {
         return "parquet";
     case TFileFormatType::FORMAT_ORC:
         return "orc";
+    case TFileFormatType::FORMAT_NATIVE:
+        return "native";
     default:
         return "unknown";
     }
diff --git a/be/test/vec/exec/format/native/native_reader_writer_test.cpp 
b/be/test/vec/exec/format/native/native_reader_writer_test.cpp
new file mode 100644
index 00000000000..2a61c12eb0d
--- /dev/null
+++ b/be/test/vec/exec/format/native/native_reader_writer_test.cpp
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+
+#include "common/config.h"
+#include "io/fs/local_file_system.h"
+#include "io/fs/path.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/runtime_state.h"
+#include "util/uid_util.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
+#include "vec/columns/column_struct.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/data_types/data_type_variant.h"
+#include "vec/data_types/serde/data_type_serde.h"
+#include "vec/exec/format/native/native_reader.h"
+#include "vec/runtime/vnative_transformer.h"
+
+namespace doris::vectorized {
+
+class NativeReaderWriterTest : public ::testing::Test {};
+
+static void fill_primitive_columns(Block& block, size_t rows) {
+    DataTypePtr int_type =
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false));
+    DataTypePtr str_type =
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, 
false));
+
+    {
+        MutableColumnPtr col = int_type->create_column();
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 3 == 0) {
+                // null
+                col->insert_default();
+            } else {
+                // insert int value via Field to match column interface
+                auto field =
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i * 10));
+                col->insert(field);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), int_type, 
"int_col"));
+    }
+
+    {
+        MutableColumnPtr col = str_type->create_column();
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 4 == 0) {
+                col->insert_default();
+            } else {
+                std::string v = "s" + std::to_string(i);
+                // insert varchar value via Field
+                auto field = 
Field::create_field<PrimitiveType::TYPE_VARCHAR>(v);
+                col->insert(field);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), str_type, 
"str_col"));
+    }
+}
+
+static void fill_array_column(Block& block, size_t rows) {
+    // array<int>
+    DataTypePtr arr_nested_type = 
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+    DataTypePtr arr_type = 
make_nullable(std::make_shared<DataTypeArray>(arr_nested_type));
+
+    {
+        MutableColumnPtr col = arr_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+        auto& array_col = assert_cast<ColumnArray&>(nested);
+        auto& offsets = array_col.get_offsets();
+        auto& data = array_col.get_data();
+        auto mutable_data = data.assume_mutable();
+
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 5 == 0) {
+                // null array
+                nullable_col->insert_default();
+            } else {
+                // non-null array with 3 elements: [i, i+1, i+2]
+                null_map.push_back(0);
+                mutable_data->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
+                mutable_data->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
+                mutable_data->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 2)));
+                offsets.push_back(offsets.empty() ? 3 : offsets.back() + 3);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), arr_type, 
"arr_col"));
+    }
+}
+
+static void fill_map_column(Block& block, size_t rows) {
+    // map<string, int>
+    DataTypePtr map_key_type = 
DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false);
+    DataTypePtr map_value_type = 
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+    DataTypePtr map_type =
+            make_nullable(std::make_shared<DataTypeMap>(map_key_type, 
map_value_type));
+
+    // map<string, int> column
+    {
+        MutableColumnPtr col = map_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 7 == 0) {
+                // null map
+                nullable_col->insert_default();
+            } else {
+                null_map.push_back(0);
+                auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets();
+                auto& keys = assert_cast<ColumnMap&>(nested).get_keys();
+                auto& values = assert_cast<ColumnMap&>(nested).get_values();
+
+                auto mutable_keys = keys.assume_mutable();
+                auto mutable_values = values.assume_mutable();
+
+                std::string k1 = "k" + std::to_string(i);
+                std::string k2 = "k" + std::to_string(i + 1);
+                
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k1));
+                mutable_values->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
+                
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k2));
+                mutable_values->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
+
+                offsets.push_back(offsets.empty() ? 2 : offsets.back() + 2);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), map_type, 
"map_col"));
+    }
+}
+
+static void fill_struct_column(Block& block, size_t rows) {
+    // struct<si:int, ss:string>
+    DataTypes struct_fields;
+    struct_fields.emplace_back(
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)));
+    struct_fields.emplace_back(
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, 
false)));
+    DataTypePtr struct_type = 
make_nullable(std::make_shared<DataTypeStruct>(struct_fields));
+
+    // struct<si:int, ss:string> column
+    {
+        MutableColumnPtr col = struct_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+
+        auto& struct_col = assert_cast<ColumnStruct&>(nested);
+        const auto& fields = struct_col.get_columns();
+
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 6 == 0) {
+                nullable_col->insert_default();
+            } else {
+                null_map.push_back(0);
+                auto mutable_field0 = fields[0]->assume_mutable();
+                auto mutable_field1 = fields[1]->assume_mutable();
+                // int field
+                
mutable_field0->insert(Field::create_field<PrimitiveType::TYPE_INT>(
+                        static_cast<int32_t>(i * 100)));
+                // string field
+                std::string vs = "ss" + std::to_string(i);
+                
mutable_field1->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(vs));
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), struct_type, 
"struct_col"));
+    }
+}
+
+static void fill_variant_column(Block& block, size_t rows) {
+    // variant
+    DataTypePtr variant_type = 
make_nullable(std::make_shared<DataTypeVariant>());
+
+    // variant column: use JSON strings + deserialize_column_from_json_vector 
to populate ColumnVariant
+    {
+        MutableColumnPtr col = variant_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column(); // ColumnVariant
+
+        // Prepare JSON strings with variable number of keys per row
+        std::vector<std::string> json_rows;
+        json_rows.reserve(rows);
+        std::vector<Slice> slices;
+        slices.reserve(rows);
+
+        size_t key_cnt = 100;
+        for (size_t i = 0; i < rows; ++i) {
+            // key count cycles between 1, 2, 3, ... for better coverage
+            std::string json = "{";
+            for (size_t k = 0; k < key_cnt; ++k) {
+                if (k > 0) {
+                    json += ",";
+                }
+                // keys: "k0", "k1", ...
+                json += "\"k" + std::to_string(k) + "\":";
+                // mix int and string values
+                if ((i + k) % 2 == 0) {
+                    json += std::to_string(static_cast<int32_t>(i * 10 + k));
+                } else {
+                    json += "\"v" + std::to_string(i * 10 + k) + "\"";
+                }
+            }
+            json += "}";
+            json_rows.emplace_back(std::move(json));
+            slices.emplace_back(json_rows.back().data(), 
json_rows.back().size());
+        }
+
+        // Use Variant SerDe to parse JSON into ColumnVariant
+        auto variant_type_inner = std::make_shared<DataTypeVariant>();
+        auto serde = variant_type_inner->get_serde();
+        auto* variant_serde = assert_cast<DataTypeVariantSerDe*>(serde.get());
+
+        uint64_t num_deserialized = 0;
+        DataTypeSerDe::FormatOptions options;
+        Status st = variant_serde->deserialize_column_from_json_vector(nested, 
slices,
+                                                                       
&num_deserialized, options);
+        ASSERT_TRUE(st.ok()) << st;
+        ASSERT_EQ(rows, num_deserialized);
+
+        // All rows are treated as non-null here
+        auto& null_map = nullable_col->get_null_map_data();
+        null_map.clear();
+        null_map.resize_fill(rows, 0);
+
+        block.insert(ColumnWithTypeAndName(std::move(col), variant_type, 
"variant_col"));
+    }
+}
+
+static void fill_complex_columns(Block& block, size_t rows) {
+    fill_array_column(block, rows);
+    fill_map_column(block, rows);
+    fill_struct_column(block, rows);
+    fill_variant_column(block, rows);
+}
+
+static Block create_test_block(size_t rows) {
+    Block block;
+    // simple schema with primitive + complex types:
+    // int_col, str_col, arr_col, map_col, struct_col, variant_col
+    fill_primitive_columns(block, rows);
+    fill_complex_columns(block, rows);
+
+    return block;
+}
+
+TEST_F(NativeReaderWriterTest, round_trip_native_file) {
+    // Prepare a temporary local file path
+    UniqueId uid = UniqueId::gen_uid();
+    std::string uuid = uid.to_string();
+    std::string file_path = "./native_format_" + uuid + ".native";
+
+    // 1. Write block to Native file via VNativeTransformer
+    auto fs = io::global_local_filesystem();
+    io::FileWriterPtr file_writer;
+    Status st = fs->create_file(file_path, &file_writer);
+    ASSERT_TRUE(st.ok()) << st;
+
+    RuntimeState state;
+    VExprContextSPtrs exprs; // empty, VNativeTransformer won't use it directly
+    VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+    st = transformer.open();
+    ASSERT_TRUE(st.ok()) << st;
+
+    Block src_block = create_test_block(16);
+    st = transformer.write(src_block);
+    ASSERT_TRUE(st.ok()) << st;
+    // VNativeTransformer::close() will also close the underlying FileWriter,
+    // so we don't need (and must not) close file_writer again here.
+    st = transformer.close();
+    ASSERT_TRUE(st.ok()) << st;
+
+    // 2. Read back via NativeReader using normal file scan path
+    TFileScanRangeParams scan_params;
+    scan_params.__set_file_type(TFileType::FILE_LOCAL);
+    TFileRangeDesc scan_range;
+    scan_range.__set_path(file_path);
+    scan_range.__set_file_type(TFileType::FILE_LOCAL);
+    NativeReader reader_impl(nullptr, scan_params, scan_range, 4064, nullptr, 
&state);
+
+    Block dst_block;
+    size_t read_rows = 0;
+    bool eof = false;
+    st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(src_block.rows(), read_rows);
+    ASSERT_EQ(src_block.columns(), dst_block.columns());
+
+    // Compare column-wise values
+    for (size_t col = 0; col < src_block.columns(); ++col) {
+        const auto& src = src_block.get_by_position(col);
+        const auto& dst = dst_block.get_by_position(col);
+        ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
+        ASSERT_EQ(src.column->size(), dst.column->size());
+        for (size_t row = 0; row < src_block.rows(); ++row) {
+            auto src_field = (*src.column)[row];
+            auto dst_field = (*dst.column)[row];
+            ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " 
row=" << row;
+        }
+    }
+
+    // Next call should hit EOF
+    Block dst_block2;
+    read_rows = 0;
+    eof = false;
+    st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_TRUE(eof);
+    ASSERT_EQ(read_rows, 0);
+
+    // Clean up temp file
+    bool exists = false;
+    st = fs->exists(file_path, &exists);
+    if (st.ok() && exists) {
+        static_cast<void>(fs->delete_file(file_path));
+    }
+}
+
+// Edge cases: empty file and single-row file
+TEST_F(NativeReaderWriterTest, round_trip_empty_and_single_row) {
+    auto fs = io::global_local_filesystem();
+
+    // 1. Empty native file (no data blocks)
+    {
+        UniqueId uid = UniqueId::gen_uid();
+        std::string uuid = uid.to_string();
+        std::string file_path = "./native_format_empty_" + uuid + ".native";
+
+        io::FileWriterPtr file_writer;
+        Status st = fs->create_file(file_path, &file_writer);
+        ASSERT_TRUE(st.ok()) << st;
+
+        RuntimeState state;
+        VExprContextSPtrs exprs;
+        VNativeTransformer transformer(&state, file_writer.get(), exprs, 
false);
+        st = transformer.open();
+        ASSERT_TRUE(st.ok()) << st;
+
+        // Write an empty block, should not produce any data block in file.
+        Block empty_block;
+        st = transformer.write(empty_block);
+        ASSERT_TRUE(st.ok()) << st;
+        st = transformer.close();
+        ASSERT_TRUE(st.ok()) << st;
+
+        // Read back: should directly hit EOF with 0 rows.
+        TFileScanRangeParams scan_params;
+        scan_params.__set_file_type(TFileType::FILE_LOCAL);
+        TFileRangeDesc scan_range;
+        scan_range.__set_path(file_path);
+        scan_range.__set_file_type(TFileType::FILE_LOCAL);
+        NativeReader reader_impl(nullptr, scan_params, scan_range, 4096, 
nullptr, &state);
+
+        Block dst_block;
+        size_t read_rows = 0;
+        bool eof = false;
+        st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+        ASSERT_TRUE(st.ok()) << st;
+        ASSERT_EQ(0U, read_rows);
+        ASSERT_TRUE(eof);
+
+        bool exists = false;
+        st = fs->exists(file_path, &exists);
+        if (st.ok() && exists) {
+            static_cast<void>(fs->delete_file(file_path));
+        }
+    }
+
+    // 2. Single-row native file
+    {
+        UniqueId uid = UniqueId::gen_uid();
+        std::string uuid = uid.to_string();
+        std::string file_path = "./native_format_single_row_" + uuid + 
".native";
+
+        io::FileWriterPtr file_writer;
+        Status st = fs->create_file(file_path, &file_writer);
+        ASSERT_TRUE(st.ok()) << st;
+
+        RuntimeState state;
+        VExprContextSPtrs exprs;
+        VNativeTransformer transformer(&state, file_writer.get(), exprs, 
false);
+        st = transformer.open();
+        ASSERT_TRUE(st.ok()) << st;
+
+        Block src_block = create_test_block(1);
+        st = transformer.write(src_block);
+        ASSERT_TRUE(st.ok()) << st;
+        st = transformer.close();
+        ASSERT_TRUE(st.ok()) << st;
+
+        TFileScanRangeParams scan_params;
+        scan_params.__set_file_type(TFileType::FILE_LOCAL);
+        TFileRangeDesc scan_range;
+        scan_range.__set_path(file_path);
+        scan_range.__set_file_type(TFileType::FILE_LOCAL);
+        NativeReader reader_impl(nullptr, scan_params, scan_range, 4096, 
nullptr, &state);
+
+        Block dst_block;
+        size_t read_rows = 0;
+        bool eof = false;
+        st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+        ASSERT_TRUE(st.ok()) << st;
+        ASSERT_EQ(src_block.rows(), read_rows);
+        ASSERT_EQ(src_block.columns(), dst_block.columns());
+        ASSERT_FALSE(eof);
+
+        // Verify data equality for the single row.
+        for (size_t col = 0; col < src_block.columns(); ++col) {
+            const auto& src = src_block.get_by_position(col);
+            const auto& dst = dst_block.get_by_position(col);
+            ASSERT_EQ(src.type->get_family_name(), 
dst.type->get_family_name());
+            ASSERT_EQ(src.column->size(), dst.column->size());
+            auto src_field = (*src.column)[0];
+            auto dst_field = (*dst.column)[0];
+            ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " 
row=0";
+        }
+
+        // Next call should hit EOF
+        Block dst_block2;
+        read_rows = 0;
+        eof = false;
+        st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+        ASSERT_TRUE(st.ok()) << st;
+        ASSERT_TRUE(eof);
+        ASSERT_EQ(read_rows, 0U);
+
+        bool exists = false;
+        st = fs->exists(file_path, &exists);
+        if (st.ok() && exists) {
+            static_cast<void>(fs->delete_file(file_path));
+        }
+    }
+}
+
+// Large volume test: verify round-trip correctness with many rows.
+TEST_F(NativeReaderWriterTest, round_trip_native_file_large_rows) {
+    UniqueId uid = UniqueId::gen_uid();
+    std::string uuid = uid.to_string();
+    std::string file_path = "./native_format_large_" + uuid + ".native";
+
+    auto fs = io::global_local_filesystem();
+    io::FileWriterPtr file_writer;
+    Status st = fs->create_file(file_path, &file_writer);
+    ASSERT_TRUE(st.ok()) << st;
+
+    RuntimeState state;
+    VExprContextSPtrs exprs;
+    VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+    st = transformer.open();
+    ASSERT_TRUE(st.ok()) << st;
+
+    // Use a relatively large number of rows to test stability and performance 
characteristics.
+    const size_t kRows = 9999;
+    Block src_block = create_test_block(kRows);
+
+    // Split into multiple blocks and write them one by one to exercise 
multi-block path.
+    const size_t kBatchRows = 128;
+    for (size_t offset = 0; offset < kRows; offset += kBatchRows) {
+        size_t len = std::min(kBatchRows, kRows - offset);
+        // Create a sub block with the same schema and a row range [offset, 
offset+len)
+        Block sub_block = src_block.clone_empty();
+        for (size_t col = 0; col < src_block.columns(); ++col) {
+            const auto& src_col = *src_block.get_by_position(col).column;
+            const auto& dst_col_holder = 
*sub_block.get_by_position(col).column;
+            auto dst_mutable = dst_col_holder.assume_mutable();
+            dst_mutable->insert_range_from(src_col, offset, len);
+        }
+        st = transformer.write(sub_block);
+        ASSERT_TRUE(st.ok()) << st;
+    }
+    st = transformer.close();
+    ASSERT_TRUE(st.ok()) << st;
+
+    TFileScanRangeParams scan_params;
+    scan_params.__set_file_type(TFileType::FILE_LOCAL);
+    TFileRangeDesc scan_range;
+    scan_range.__set_path(file_path);
+    scan_range.__set_file_type(TFileType::FILE_LOCAL);
+    NativeReader reader_impl(nullptr, scan_params, scan_range, 8192, nullptr, 
&state);
+
+    // Read back in multiple blocks and merge into a single result block.
+    Block merged_block;
+    bool first_block = true;
+    size_t total_read_rows = 0;
+    while (true) {
+        Block dst_block;
+        size_t read_rows = 0;
+        bool eof = false;
+        st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+        ASSERT_TRUE(st.ok()) << st;
+        if (read_rows > 0) {
+            if (first_block) {
+                merged_block = dst_block;
+                total_read_rows = read_rows;
+                first_block = false;
+            } else {
+                MutableBlock merged_mutable(&merged_block);
+                Status add_st = merged_mutable.add_rows(&dst_block, 0, 
read_rows);
+                ASSERT_TRUE(add_st.ok()) << add_st;
+                total_read_rows += read_rows;
+            }
+        }
+        if (eof) {
+            break;
+        }
+        if (read_rows == 0) {
+            break;
+        }
+    }
+
+    ASSERT_EQ(src_block.rows(), total_read_rows);
+    ASSERT_EQ(src_block.columns(), merged_block.columns());
+
+    // Compare column-wise values
+    for (size_t col = 0; col < src_block.columns(); ++col) {
+        const auto& src = src_block.get_by_position(col);
+        const auto& dst = merged_block.get_by_position(col);
+        ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
+        ASSERT_EQ(src.column->size(), dst.column->size());
+        for (size_t row = 0; row < src_block.rows(); row += 10) {
+            auto src_field = (*src.column)[row];
+            auto dst_field = (*dst.column)[row];
+            ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " 
row=" << row;
+        }
+    }
+    bool exists = false;
+    st = fs->exists(file_path, &exists);
+    if (st.ok() && exists) {
+        static_cast<void>(fs->delete_file(file_path));
+    }
+}
+
+} // namespace doris::vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
index 2cd5852dea4..9b17d06708d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
@@ -30,6 +30,7 @@ public class FileFormatConstants {
     public static final String FORMAT_AVRO = "avro";
     public static final String FORMAT_WAL = "wal";
     public static final String FORMAT_ARROW = "arrow";
+    public static final String FORMAT_NATIVE = "native";
 
     public static final String PROP_FORMAT = "format";
     public static final String PROP_COLUMN_SEPARATOR = "column_separator";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index 3b96e7b3b54..7ec4456eddf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -575,6 +575,9 @@ public class Util {
             return TFileFormatType.FORMAT_WAL;
         } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ARROW)) {
             return TFileFormatType.FORMAT_ARROW;
+        } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_NATIVE)) {
+            // Doris Native binary columnar format
+            return TFileFormatType.FORMAT_NATIVE;
         } else {
             return TFileFormatType.FORMAT_UNKNOWN;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
index 865da510d23..f3f1bb1a6d8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
@@ -39,6 +39,7 @@ public abstract class FileFormatProperties {
     public static final String FORMAT_AVRO = "avro";
     public static final String FORMAT_WAL = "wal";
     public static final String FORMAT_ARROW = "arrow";
+    public static final String FORMAT_NATIVE = "native";
     public static final String PROP_COMPRESS_TYPE = "compress_type";
 
     protected String formatName;
@@ -98,6 +99,8 @@ public abstract class FileFormatProperties {
                 return new WalFileFormatProperties();
             case FORMAT_ARROW:
                 return new ArrowFileFormatProperties();
+            case FORMAT_NATIVE:
+                return new NativeFileFormatProperties();
             default:
                 throw new AnalysisException("format:" + formatString + " is 
not supported.");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
new file mode 100644
index 00000000000..fe4306906d4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import java.util.Map;
+
+/**
+ * File format properties for Doris Native binary columnar format.
+ *
+ * This format is intended for high-performance internal data exchange
+ */
+
+public class NativeFileFormatProperties extends FileFormatProperties {
+
+    public NativeFileFormatProperties() {
+        super(TFileFormatType.FORMAT_NATIVE, 
FileFormatProperties.FORMAT_NATIVE);
+    }
+
+    @Override
+    public void analyzeFileFormatProperties(Map<String, String> 
formatProperties,
+                                            boolean isRemoveOriginProperty)
+            throws AnalysisException {
+        // Currently no extra user visible properties for native format.
+        // Just ignore all other properties gracefully.
+    }
+
+    @Override
+    public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) 
{
+        // No extra sink options are required for native format.
+    }
+
+    @Override
+    public TFileAttributes toTFileAttributes() {
+        // For now we don't need text params for Native format, but 
TFileAttributes
+        // requires a text_params field to be non-null on BE side.
+        TFileAttributes fileAttributes = new TFileAttributes();
+        TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+        fileAttributes.setTextParams(textParams);
+        return fileAttributes;
+    }
+}
+
+
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/LoadExprTransformUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/LoadExprTransformUtils.java
index e8498b0aff7..7e24576edf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadExprTransformUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadExprTransformUtils.java
@@ -327,7 +327,7 @@ public class LoadExprTransformUtils {
             } else {
                 SlotDescriptor slotDesc = 
analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
 
-                if (formatType == TFileFormatType.FORMAT_ARROW) {
+                if (formatType == TFileFormatType.FORMAT_ARROW || formatType 
== TFileFormatType.FORMAT_NATIVE) {
                     slotDesc.setColumn(new Column(realColName, 
colToType.get(realColName)));
                 } else {
                     if (uniquekeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && hasSkipBitmapColumn) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 9f9f769c780..7d758d1acaa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.VariantType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.Pair;
@@ -324,7 +325,8 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
      * @return column type and the number of parsed PTypeNodes
      */
     private Pair<Type, Integer> getColumnType(List<PTypeNode> typeNodes, int 
start) {
-        PScalarType columnType = typeNodes.get(start).getScalarType();
+        PTypeNode typeNode = typeNodes.get(start);
+        PScalarType columnType = typeNode.getScalarType();
         TPrimitiveType tPrimitiveType = 
TPrimitiveType.findByValue(columnType.getType());
         Type type;
         int parsedNodes;
@@ -356,6 +358,15 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
                 parsedNodes += fieldType.value();
             }
             type = new StructType(fields);
+        } else if (tPrimitiveType == TPrimitiveType.VARIANT) {
+            // Preserve VARIANT-specific properties from PTypeNode, especially 
variant_max_subcolumns_count.
+            int maxSubcolumns = typeNode.getVariantMaxSubcolumnsCount();
+            // Currently no predefined fields are carried in PTypeNode for 
VARIANT, so use empty list and default
+            // values for other properties.
+            type = new VariantType(new ArrayList<>(), maxSubcolumns,
+                    /*enableTypedPathsToSparse*/ false,
+                    /*variantMaxSparseColumnStatisticsSize*/ 10000);
+            parsedNodes = 1;
         } else {
             type = 
ScalarType.createType(PrimitiveType.fromThrift(tPrimitiveType),
                     columnType.getLen(), columnType.getPrecision(), 
columnType.getScale());
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 908c7d10530..7b70e5da4c8 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -122,7 +122,8 @@ enum TFileFormatType {
     FORMAT_CSV_SNAPPYBLOCK,
     FORMAT_WAL,
     FORMAT_ARROW,
-    FORMAT_TEXT
+    FORMAT_TEXT,
+    FORMAT_NATIVE
 }
 
 // In previous versions, the data compression format and file format were 
stored together, as TFileFormatType,
diff --git 
a/regression-test/data/export_p0/outfile/native/test_outfile_native.out 
b/regression-test/data/export_p0/outfile/native/test_outfile_native.out
new file mode 100644
index 00000000000..fc60340a7e1
--- /dev/null
+++ b/regression-test/data/export_p0/outfile/native/test_outfile_native.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_default --
+1      2024-01-01      2024-01-01T00:00        s1      1       1       true    
1.1
+2      2024-01-01      2024-01-01T00:00        s2      2       2       true    
2.2
+3      2024-01-01      2024-01-01T00:00        s3      3       3       true    
3.3
+4      2024-01-01      2024-01-01T00:00        s4      4       4       true    
4.4
+5      2024-01-01      2024-01-01T00:00        s5      5       5       true    
5.5
+6      2024-01-01      2024-01-01T00:00        s6      6       6       true    
6.6
+7      2024-01-01      2024-01-01T00:00        s7      7       7       true    
7.7
+8      2024-01-01      2024-01-01T00:00        s8      8       8       true    
8.800000000000001
+9      2024-01-01      2024-01-01T00:00        s9      9       9       true    
9.9
+10     2024-01-01      2024-01-01T00:00        s10     10      10      true    
10.1
+
+-- !select_s3_native --
+1      2024-01-01      2024-01-01T00:00        s1      1       1       true    
1.1
+2      2024-01-01      2024-01-01T00:00        s2      2       2       true    
2.2
+3      2024-01-01      2024-01-01T00:00        s3      3       3       true    
3.3
+4      2024-01-01      2024-01-01T00:00        s4      4       4       true    
4.4
+5      2024-01-01      2024-01-01T00:00        s5      5       5       true    
5.5
+6      2024-01-01      2024-01-01T00:00        s6      6       6       true    
6.6
+7      2024-01-01      2024-01-01T00:00        s7      7       7       true    
7.7
+8      2024-01-01      2024-01-01T00:00        s8      8       8       true    
8.800000000000001
+9      2024-01-01      2024-01-01T00:00        s9      9       9       true    
9.9
+10     2024-01-01      2024-01-01T00:00        s10     10      10      true    
10.1
+
diff --git 
a/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy 
b/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy
new file mode 100644
index 00000000000..26c9e9ab704
--- /dev/null
+++ b/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_outfile_native", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+    def tableName = "outfile_native_test"
+    def outFilePath = "${bucket}/outfile/native/exp_"
+
+    // 导出 helper:写到 S3,返回 FE 输出的 URL
+    def outfile_to_s3 = {
+        def res = sql """
+            SELECT * FROM ${tableName} t ORDER BY id
+            INTO OUTFILE "s3://${outFilePath}"
+            FORMAT AS native
+            PROPERTIES (
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}"
+            );
+        """
+        return res[0][3]
+    }
+
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `id` INT NOT NULL,
+            `c_date` DATE NOT NULL,
+            `c_dt` DATETIME NOT NULL,
+            `c_str` VARCHAR(20),
+            `c_int` INT,
+            `c_tinyint` TINYINT,
+            `c_bool` boolean,
+            `c_double` double
+        )
+        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        """
+
+        // 插入 10 行测试数据(最后一行全 NULL)
+        StringBuilder sb = new StringBuilder()
+        int i = 1
+        for (; i < 10000; i ++) {
+            sb.append("""
+                (${i}, '2024-01-01', '2024-01-01 00:00:00', 's${i}', ${i}, ${i 
% 128}, true, ${i}.${i}),
+            """)
+        }
+        sb.append("""
+                (${i}, '2024-01-01', '2024-01-01 00:00:00', NULL, NULL, NULL, 
NULL, NULL)
+            """)
+        sql """ INSERT INTO ${tableName} VALUES ${sb.toString()} """
+
+        // baseline:本地表查询结果
+        qt_select_default """ SELECT * FROM ${tableName} t ORDER BY id limit 
10; """
+
+        // 1) 导出为 Native 文件到 S3
+        def outfileUrl = outfile_to_s3()
+
+        // 2) 从 S3 使用 S3 TVF (format=native) 查询回数据,并与 baseline 对比
+        // outfileUrl 形如:s3://bucket/outfile/native/exp_xxx_* ,需要去掉 
"s3://bucket" 前缀和末尾的 '*'
+        qt_select_s3_native """ SELECT * FROM S3 (
+                "uri" = "http://${bucket}.${s3_endpoint}${
+                        outfileUrl.substring(5 + bucket.length(), 
outfileUrl.length() - 1)
+                    }0.native",
+                "ACCESS_KEY"= "${ak}",
+                "SECRET_KEY" = "${sk}",
+                "format" = "native",
+                "region" = "${region}"
+            ) order by id limit 10;
+            """
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${tableName}")
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy 
b/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy
new file mode 100644
index 00000000000..cf448a064e2
--- /dev/null
+++ b/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.io.File
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+suite("test_export_variant_10k_columns", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    // String bucket = context.config.otherConfigs.get("s3BucketName");
+    String bucket = getS3BucketName()
+
+    def table_export_name = "test_export_variant_10k"
+    def table_load_name = "test_load_variant_10k"
+    def outfile_path_prefix = """${bucket}/export/p0/variant_10k/exp"""
+
+    def waiting_export = { export_label ->
+        while (true) {
+            def res = sql """ show export where label = "${export_label}" """
+            logger.info("export state: " + res[0][2])
+            if (res[0][2] == "FINISHED") {
+                def json = parseJson(res[0][11])
+                assert json instanceof List
+                // assertEquals("1", json.fileNumber[0][0])
+                log.info("outfile_path: ${json.url[0][0]}")
+                return json.url[0][0];
+            } else if (res[0][2] == "CANCELLED") {
+                throw new IllegalStateException("""export failed: 
${res[0][10]}""")
+            } else {
+                sleep(5000)
+            }
+        }
+    }
+
+    // 1. Create table with variant column
+    sql """ DROP TABLE IF EXISTS ${table_export_name} """
+    sql """
+    CREATE TABLE IF NOT EXISTS ${table_export_name} (
+        `id` INT NOT NULL,
+        `v` VARIANT<PROPERTIES ("variant_max_subcolumns_count" = "2048")> NULL
+    )
+    DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+    """
+
+    // 2. Generate data with 10000 keys in variant
+    // Generate N=100,000 rows.
+    // Total 10,000 columns, but each row only has 50 columns (sparse).
+    // This simulates a realistic sparse wide table scenario.
+
+    File dataFile = File.createTempFile("variant_10k_data", ".json")
+    dataFile.deleteOnExit()
+    int num_rows = 1000
+    try {
+        dataFile.withWriter { writer ->
+            StringBuilder sb = new StringBuilder()
+            for (int i = 1; i <= num_rows; i++) {
+                sb.setLength(0)
+                sb.append("{\"id\": ").append(i).append(", \"v\": {")
+                // Select 50 keys out of 10000 for each row
+                for (int k = 0; k < 50; k++) {
+                    if (k > 0) sb.append(", ")
+                    // Scatter the keys to ensure coverage of all 10000 
columns across rows
+                    int keyIdx = (i + k * 200) % 10000
+                    sb.append('"k').append(keyIdx).append('":').append(i)
+                }
+                sb.append("}}\n")
+                writer.write(sb.toString())
+            }
+        }
+
+        // 3. Stream Load
+        streamLoad {
+            table table_export_name
+            set 'format', 'json'
+            set 'read_json_by_line', 'true'
+            file dataFile.getAbsolutePath()
+            time 60000 // 60s
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(num_rows, json.NumberTotalRows)
+                assertEquals(num_rows, json.NumberLoadedRows)
+            }
+        }
+    } finally {
+        dataFile.delete()
+    }
+
+    // def format = "parquet"
+    def format = "native"
+
+    // 4. Export to S3 (Parquet)
+    def uuid = UUID.randomUUID().toString()
+    // def outFilePath = """/tmp/variant_10k_export"""
+    def outFilePath = """${outfile_path_prefix}_${uuid}"""
+    def label = "label_${uuid}"
+
+    try {
+        // for (int i = 0; i < 10; i++) {
+        //     // exec export
+        //     sql """
+        //         EXPORT TABLE ${table_export_name} TO 
"file://${outFilePath}/"
+        //         PROPERTIES(
+        //             "label" = "${label}_${i}",
+        //             "format" = "${format}"
+        //         ) 
+        //     """
+        //     long startExport = System.currentTimeMillis()
+        //     def outfile_url = waiting_export.call("${label}_${i}")
+        //     long endExport = System.currentTimeMillis()
+        //     logger.info("Export 100k rows with variant took ${endExport - 
startExport} ms")
+        // }
+        // exec export
+        // sql """
+        //     EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
+        //     PROPERTIES(
+        //         "label" = "${label}",
+        //         "format" = "${format}"
+        //     ) 
+        // """
+
+        sql """
+            EXPORT TABLE ${table_export_name} TO "s3://${outFilePath}/"
+            PROPERTIES(
+                "label" = "${label}",
+                "format" = "${format}"
+            )
+            WITH S3(
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}",
+                "provider" = "${getS3Provider()}"
+            );
+        """
+        
+        long startExport = System.currentTimeMillis()
+        def outfile_url = waiting_export.call(label)
+        long endExport = System.currentTimeMillis()
+        logger.info("Export ${num_rows} rows with variant took ${endExport - 
startExport} ms")
+        
+        // 5. Validate by S3 TVF
+        def s3_tvf_sql = """ s3(
+                "uri" = 
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), 
outfile_url.length() - 1)}0.${format}",
+                "s3.access_key"= "${ak}",
+                "s3.secret_key" = "${sk}",
+                "format" = "${format}",
+                "provider" = "${getS3Provider()}",
+                "region" = "${region}"
+            ) """
+
+        // Verify row count
+        def res = sql """ SELECT count(*) FROM ${s3_tvf_sql} """
+        assertEquals(num_rows, res[0][0])
+
+        def value_type = "VARIANT<PROPERTIES (\"variant_max_subcolumns_count\" 
= \"2048\")>"
+        if (new Random().nextInt(2) == 0) {
+            value_type = "text"
+        }
+        // 6. Load back into Doris (to a new table) to verify import 
performance/capability
+        sql """ DROP TABLE IF EXISTS ${table_load_name} """
+        sql """
+        CREATE TABLE ${table_load_name} (
+            `id` INT,
+            `v` ${value_type}
+        )
+        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        """
+        
+        // Use LOAD LABEL. The label contains '-' from UUID, so it must be 
quoted with back-quotes.
+        def load_label = "load_${uuid}"
+        
+        sql """
+            LOAD LABEL `${load_label}`
+            (
+                DATA INFILE("s3://${outFilePath}/*")
+                INTO TABLE ${table_load_name}
+                FORMAT AS "${format}"
+                (id, v)
+            )
+            WITH S3 (
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}",
+                "provider" = "${getS3Provider()}"
+            );
+        """
+        
+        Awaitility.await().atMost(240, SECONDS).pollInterval(5, 
SECONDS).until({
+            def loadResult = sql """
+           show load where label = '${load_label}'
+           """
+            if (loadResult.get(0).get(2) == 'CANCELLED' || 
loadResult.get(0).get(2) == 'FAILED') {
+                println("load failed: " + loadResult.get(0))
+                throw new RuntimeException("load failed"+ loadResult.get(0))
+            }
+            return loadResult.get(0).get(2) == 'FINISHED'
+        }) 
+        // Check if data loaded
+        def load_count = sql """ SELECT count(*) FROM ${table_load_name} """
+        assertEquals(num_rows, load_count[0][0])
+
+        // Check variant data integrity (sample)
+        // Row 1 has keys: (1 + k*200) % 10000. For k=0, key is k1. Value is 1.
+        def check_v = sql """ SELECT cast(v['k1'] as int) FROM 
${table_load_name} WHERE id = 1 """
+        assertEquals(1, check_v[0][0])
+
+    } finally {
+        // try_sql("DROP TABLE IF EXISTS ${table_export_name}")
+        // try_sql("DROP TABLE IF EXISTS ${table_load_name}")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to