Copilot commented on code in PR #58711:
URL: https://github.com/apache/doris/pull/58711#discussion_r2621732041


##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java:
##########
@@ -384,6 +386,16 @@ private Pair<Type, Integer> getColumnType(List<PTypeNode> 
typeNodes, int start)
                 parsedNodes += fieldType.value();
             }
             type = new StructType(fields);
+        } else if (tPrimitiveType == TPrimitiveType.VARIANT) {
+            // Preserve VARIANT-specific properties from PTypeNode, especially 
variant_max_subcolumns_count.
+            int maxSubcolumns = typeNode.getVariantMaxSubcolumnsCount();
+            // Currently no predefined fields are carried in PTypeNode for 
VARIANT, so use empty list and default
+            // values for other properties.
+            type = new VariantType(new ArrayList<>(), maxSubcolumns,
+                    /*enableTypedPathsToSparse*/ false,
+                    /*variantMaxSparseColumnStatisticsSize*/ 10000,
+                    /*variantSparseHashShardCount*/ 0);

Review Comment:
   The hardcoded values for VARIANT properties 
(variant_max_subcolumns_count=10000, variant_sparse_hash_shard_count=0) should 
be documented or made configurable. These magic numbers appear to be defaults 
but it's unclear if they match the properties from the PTypeNode or if they 
could cause issues in some scenarios.
   ```suggestion
               // Extract variantMaxSparseColumnStatisticsSize and 
variantSparseHashShardCount from PTypeNode if present,
               // otherwise use documented default values.
               int variantMaxSparseColumnStatisticsSize = 
typeNode.hasVariantMaxSparseColumnStatisticsSize()
                       ? typeNode.getVariantMaxSparseColumnStatisticsSize() : 
10000; // Default: 10000
               int variantSparseHashShardCount = 
typeNode.hasVariantSparseHashShardCount()
                       ? typeNode.getVariantSparseHashShardCount() : 0; // 
Default: 0
               type = new VariantType(new ArrayList<>(), maxSubcolumns,
                       /*enableTypedPathsToSparse*/ false,
                       variantMaxSparseColumnStatisticsSize,
                       variantSparseHashShardCount);
   ```



##########
regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_outfile_native", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+    def tableName = "outfile_native_test"
+    def outFilePath = "${bucket}/outfile/native/exp_"
+
+    // 导出 helper:写到 S3,返回 FE 输出的 URL
+    def outfile_to_s3 = {
+        def res = sql """
+            SELECT * FROM ${tableName} t ORDER BY id
+            INTO OUTFILE "s3://${outFilePath}"
+            FORMAT AS native
+            PROPERTIES (
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}"
+            );
+        """
+        return res[0][3]
+    }
+
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `id` INT NOT NULL,
+            `c_date` DATE NOT NULL,
+            `c_dt` DATETIME NOT NULL,
+            `c_str` VARCHAR(20),
+            `c_int` INT,
+            `c_tinyint` TINYINT,
+            `c_bool` boolean,
+            `c_double` double
+        )
+        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        """
+
+        // 插入 10 行测试数据(最后一行全 NULL)

Review Comment:
   Chinese comment should be translated to English for consistency. The comment 
"插入 10 行测试数据(最后一行全 NULL)" should be translated.
   ```suggestion
           // Insert 10 rows of test data (the last row is all NULL)
   ```



##########
be/src/vec/runtime/vnative_transformer.cpp:
##########
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vnative_transformer.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/data.pb.h>
+#include <glog/logging.h>
+
+#include "agent/be_exec_version_manager.h"
+#include "io/fs/file_writer.h"
+#include "runtime/runtime_state.h"
+#include "util/slice.h"
+#include "vec/core/block.h"
+#include "vec/exec/format/native/native_format.h"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+namespace {
+
+// Map high-level TFileCompressType to low-level segment_v2::CompressionTypePB.
+segment_v2::CompressionTypePB 
to_local_compression_type(TFileCompressType::type type) {
+    using CT = segment_v2::CompressionTypePB;
+    switch (type) {
+    case TFileCompressType::GZ:
+    case TFileCompressType::ZLIB:
+    case TFileCompressType::DEFLATE:
+        return CT::ZLIB;
+    case TFileCompressType::LZ4FRAME:
+    case TFileCompressType::LZ4BLOCK:
+        return CT::LZ4;
+    case TFileCompressType::SNAPPYBLOCK:
+        return CT::SNAPPY;
+    case TFileCompressType::ZSTD:
+        return CT::ZSTD;
+    default:
+        return CT::ZSTD;
+    }
+}
+
+} // namespace
+
+VNativeTransformer::VNativeTransformer(RuntimeState* state, 
doris::io::FileWriter* file_writer,
+                                       const VExprContextSPtrs& 
output_vexpr_ctxs,
+                                       bool output_object_data,
+                                       TFileCompressType::type compress_type)
+        : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+          _file_writer(file_writer),
+          _compression_type(to_local_compression_type(compress_type)) {}
+
+Status VNativeTransformer::open() {
+    // Write Doris Native file header:
+    // [magic bytes "DORISN1\0"][uint32_t format_version]
+    DCHECK(_file_writer != nullptr);
+    uint32_t version = DORIS_NATIVE_FORMAT_VERSION;
+
+    Slice magic_slice(const_cast<char*>(DORIS_NATIVE_MAGIC), 
sizeof(DORIS_NATIVE_MAGIC));
+    Slice version_slice(reinterpret_cast<char*>(&version), sizeof(uint32_t));
+
+    RETURN_IF_ERROR(_file_writer->append(magic_slice));
+    RETURN_IF_ERROR(_file_writer->append(version_slice));
+
+    _written_len += sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t);
+    return Status::OK();
+}
+
+Status VNativeTransformer::write(const Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+
+    // Serialize Block into PBlock using existing vec serialization logic.
+    PBlock pblock;
+    size_t uncompressed_bytes = 0;
+    size_t compressed_bytes = 0;
+    int64_t compressed_time = 0;
+
+    
RETURN_IF_ERROR(block.serialize(BeExecVersionManager::get_newest_version(), 
&pblock,
+                                    &uncompressed_bytes, &compressed_bytes, 
&compressed_time,
+                                    _compression_type));
+
+    std::string buff;
+    if (!pblock.SerializeToString(&buff)) {
+        auto err = Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
+                "serialize native block error. block rows: {}", block.rows());
+        return err;
+    }
+
+    // Layout of Doris Native file:
+    // [uint64_t block_size][PBlock bytes]...
+    uint64_t len = buff.size();
+    Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));

Review Comment:
   Passing the address of a local variable to Slice could be dangerous if the 
Slice outlives the variable. The 'len' variable is on the stack and its address 
is passed to len_slice. While this appears safe in this specific case since the 
Slice is immediately consumed, it would be safer to document this constraint or 
restructure the code to make the lifetime relationship clearer.



##########
be/src/vec/functions/cast/cast_to_variant.h:
##########
@@ -120,13 +136,29 @@ struct CastToVariant {
 // create cresponding variant value to wrap from_type
 WrapperType create_cast_to_variant_wrapper(const DataTypePtr& from_type,

Review Comment:
   The typo 'cresponding' should be corrected to 'corresponding' in the comment.



##########
be/src/vec/runtime/vnative_transformer.cpp:
##########
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vnative_transformer.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/data.pb.h>
+#include <glog/logging.h>
+
+#include "agent/be_exec_version_manager.h"
+#include "io/fs/file_writer.h"
+#include "runtime/runtime_state.h"
+#include "util/slice.h"
+#include "vec/core/block.h"
+#include "vec/exec/format/native/native_format.h"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+namespace {
+
+// Map high-level TFileCompressType to low-level segment_v2::CompressionTypePB.
+segment_v2::CompressionTypePB 
to_local_compression_type(TFileCompressType::type type) {
+    using CT = segment_v2::CompressionTypePB;
+    switch (type) {
+    case TFileCompressType::GZ:
+    case TFileCompressType::ZLIB:
+    case TFileCompressType::DEFLATE:
+        return CT::ZLIB;
+    case TFileCompressType::LZ4FRAME:
+    case TFileCompressType::LZ4BLOCK:
+        return CT::LZ4;
+    case TFileCompressType::SNAPPYBLOCK:
+        return CT::SNAPPY;
+    case TFileCompressType::ZSTD:
+        return CT::ZSTD;
+    default:
+        return CT::ZSTD;
+    }
+}
+
+} // namespace
+
+VNativeTransformer::VNativeTransformer(RuntimeState* state, 
doris::io::FileWriter* file_writer,
+                                       const VExprContextSPtrs& 
output_vexpr_ctxs,
+                                       bool output_object_data,
+                                       TFileCompressType::type compress_type)
+        : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+          _file_writer(file_writer),
+          _compression_type(to_local_compression_type(compress_type)) {}
+
+Status VNativeTransformer::open() {
+    // Write Doris Native file header:
+    // [magic bytes "DORISN1\0"][uint32_t format_version]
+    DCHECK(_file_writer != nullptr);
+    uint32_t version = DORIS_NATIVE_FORMAT_VERSION;
+
+    Slice magic_slice(const_cast<char*>(DORIS_NATIVE_MAGIC), 
sizeof(DORIS_NATIVE_MAGIC));

Review Comment:
   Using const_cast to remove const from a string literal is dangerous. 
DORIS_NATIVE_MAGIC is a string literal with static storage duration, and the 
Slice constructor should accept a const char* pointer. Consider either defining 
DORIS_NATIVE_MAGIC as a non-const array or using a different approach that 
doesn't require casting away constness.
   ```suggestion
       Slice magic_slice(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC));
   ```



##########
be/src/vec/functions/cast/cast_to_variant.h:
##########
@@ -120,13 +136,29 @@ struct CastToVariant {
 // create cresponding variant value to wrap from_type
 WrapperType create_cast_to_variant_wrapper(const DataTypePtr& from_type,
                                            const DataTypeVariant& to_type) {
+    if (from_type->get_primitive_type() == TYPE_VARIANT) {
+        // variant_max_subcolumns_count is not equal
+        return create_unsupport_wrapper(from_type->get_name(), 
to_type.get_name());
+    }
     return &CastToVariant::execute;
 }
 
 // create cresponding type convert from variant
 WrapperType create_cast_from_variant_wrapper(const DataTypeVariant& from_type,

Review Comment:
   The typo 'cresponding' should be corrected to 'corresponding' in the comment.



##########
regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_outfile_native", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+    def tableName = "outfile_native_test"
+    def outFilePath = "${bucket}/outfile/native/exp_"
+
+    // 导出 helper:写到 S3,返回 FE 输出的 URL
+    def outfile_to_s3 = {
+        def res = sql """
+            SELECT * FROM ${tableName} t ORDER BY id
+            INTO OUTFILE "s3://${outFilePath}"
+            FORMAT AS native
+            PROPERTIES (
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}"
+            );
+        """
+        return res[0][3]
+    }
+
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `id` INT NOT NULL,
+            `c_date` DATE NOT NULL,
+            `c_dt` DATETIME NOT NULL,
+            `c_str` VARCHAR(20),
+            `c_int` INT,
+            `c_tinyint` TINYINT,
+            `c_bool` boolean,
+            `c_double` double
+        )
+        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        """
+
+        // 插入 10 行测试数据(最后一行全 NULL)
+        StringBuilder sb = new StringBuilder()
+        int i = 1
+        for (; i < 10000; i ++) {
+            sb.append("""
+                (${i}, '2024-01-01', '2024-01-01 00:00:00', 's${i}', ${i}, ${i 
% 128}, true, ${i}.${i}),
+            """)
+        }
+        sb.append("""
+                (${i}, '2024-01-01', '2024-01-01 00:00:00', NULL, NULL, NULL, 
NULL, NULL)
+            """)
+        sql """ INSERT INTO ${tableName} VALUES ${sb.toString()} """
+
+        // baseline:本地表查询结果

Review Comment:
   Chinese comment should be translated to English for consistency. The comment 
"baseline:本地表查询结果" should be translated.
   ```suggestion
           // baseline: local table query result
   ```



##########
regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_outfile_native", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+    def tableName = "outfile_native_test"
+    def outFilePath = "${bucket}/outfile/native/exp_"
+
+    // 导出 helper:写到 S3,返回 FE 输出的 URL

Review Comment:
   Chinese comment should be translated to English for consistency with the 
rest of the codebase. The comment "导出 helper:写到 S3,返回 FE 输出的 URL" should be 
translated.
   ```suggestion
       // Export helper: write to S3 and return the URL output by FE
   ```



##########
be/test/vec/exec/format/native/native_reader_writer_test.cpp:
##########
@@ -0,0 +1,1359 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+
+#include "common/config.h"
+#include "io/fs/local_file_system.h"
+#include "io/fs/path.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/runtime_state.h"
+#include "util/jsonb_writer.h"
+#include "util/uid_util.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
+#include "vec/columns/column_struct.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/data_types/data_type_variant.h"
+#include "vec/data_types/serde/data_type_serde.h"
+#include "vec/exec/format/native/native_format.h"
+#include "vec/exec/format/native/native_reader.h"
+#include "vec/runtime/vnative_transformer.h"
+
+namespace doris::vectorized {
+
+class NativeReaderWriterTest : public ::testing::Test {};
+
+static void fill_primitive_columns(Block& block, size_t rows) {
+    DataTypePtr int_type =
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false));
+    DataTypePtr str_type =
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, 
false));
+
+    {
+        MutableColumnPtr col = int_type->create_column();
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 3 == 0) {
+                // null
+                col->insert_default();
+            } else {
+                // insert int value via Field to match column interface
+                auto field =
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i * 10));
+                col->insert(field);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), int_type, 
"int_col"));
+    }
+
+    {
+        MutableColumnPtr col = str_type->create_column();
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 4 == 0) {
+                col->insert_default();
+            } else {
+                std::string v = "s" + std::to_string(i);
+                // insert varchar value via Field
+                auto field = 
Field::create_field<PrimitiveType::TYPE_VARCHAR>(v);
+                col->insert(field);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), str_type, 
"str_col"));
+    }
+}
+
+static void fill_array_column(Block& block, size_t rows) {
+    // array<int>
+    DataTypePtr arr_nested_type = 
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+    DataTypePtr arr_type = 
make_nullable(std::make_shared<DataTypeArray>(arr_nested_type));
+
+    {
+        MutableColumnPtr col = arr_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+        auto& array_col = assert_cast<ColumnArray&>(nested);
+        auto& offsets = array_col.get_offsets();
+        auto& data = array_col.get_data();
+        auto mutable_data = data.assume_mutable();
+
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 5 == 0) {
+                // null array
+                nullable_col->insert_default();
+            } else {
+                // non-null array with 3 elements: [i, i+1, i+2]
+                null_map.push_back(0);
+                mutable_data->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
+                mutable_data->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
+                mutable_data->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 2)));
+                offsets.push_back(offsets.empty() ? 3 : offsets.back() + 3);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), arr_type, 
"arr_col"));
+    }
+}
+
+static void fill_map_column(Block& block, size_t rows) {
+    // map<string, int>
+    DataTypePtr map_key_type = 
DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false);
+    DataTypePtr map_value_type = 
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+    DataTypePtr map_type =
+            make_nullable(std::make_shared<DataTypeMap>(map_key_type, 
map_value_type));
+
+    // map<string, int> column
+    {
+        MutableColumnPtr col = map_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 7 == 0) {
+                // null map
+                nullable_col->insert_default();
+            } else {
+                null_map.push_back(0);
+                auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets();
+                auto& keys = assert_cast<ColumnMap&>(nested).get_keys();
+                auto& values = assert_cast<ColumnMap&>(nested).get_values();
+
+                auto mutable_keys = keys.assume_mutable();
+                auto mutable_values = values.assume_mutable();
+
+                std::string k1 = "k" + std::to_string(i);
+                std::string k2 = "k" + std::to_string(i + 1);
+                
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k1));
+                mutable_values->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
+                
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k2));
+                mutable_values->insert(
+                        
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
+
+                offsets.push_back(offsets.empty() ? 2 : offsets.back() + 2);
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), map_type, 
"map_col"));
+    }
+}
+
+static void fill_struct_column(Block& block, size_t rows) {
+    // struct<si:int, ss:string>
+    DataTypes struct_fields;
+    struct_fields.emplace_back(
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)));
+    struct_fields.emplace_back(
+            
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, 
false)));
+    DataTypePtr struct_type = 
make_nullable(std::make_shared<DataTypeStruct>(struct_fields));
+
+    // struct<si:int, ss:string> column
+    {
+        MutableColumnPtr col = struct_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column();
+        auto& null_map = nullable_col->get_null_map_data();
+
+        auto& struct_col = assert_cast<ColumnStruct&>(nested);
+        const auto& fields = struct_col.get_columns();
+
+        for (size_t i = 0; i < rows; ++i) {
+            if (i % 6 == 0) {
+                nullable_col->insert_default();
+            } else {
+                null_map.push_back(0);
+                auto mutable_field0 = fields[0]->assume_mutable();
+                auto mutable_field1 = fields[1]->assume_mutable();
+                // int field
+                
mutable_field0->insert(Field::create_field<PrimitiveType::TYPE_INT>(
+                        static_cast<int32_t>(i * 100)));
+                // string field
+                std::string vs = "ss" + std::to_string(i);
+                
mutable_field1->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(vs));
+            }
+        }
+        block.insert(ColumnWithTypeAndName(std::move(col), struct_type, 
"struct_col"));
+    }
+}
+
+static void fill_variant_column(Block& block, size_t rows) {
+    // variant
+    DataTypePtr variant_type = 
make_nullable(std::make_shared<DataTypeVariant>());
+
+    // variant column: use JSON strings + deserialize_column_from_json_vector 
to populate ColumnVariant
+    {
+        MutableColumnPtr col = variant_type->create_column();
+        auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+        auto& nested = nullable_col->get_nested_column(); // ColumnVariant
+
+        // Prepare JSON strings with variable number of keys per row
+        std::vector<std::string> json_rows;
+        json_rows.reserve(rows);
+        std::vector<Slice> slices;
+        slices.reserve(rows);
+
+        size_t key_cnt = 100;
+        for (size_t i = 0; i < rows; ++i) {
+            // key count cycles between 1, 2, 3, ... for better coverage
+            std::string json = "{";
+            for (size_t k = 0; k < key_cnt; ++k) {

Review Comment:
   The variable name 'key_cnt' uses an abbreviation 'cnt' which is inconsistent 
with other variable names in the codebase. Consider using 'key_count' instead 
for better readability and consistency.
   ```suggestion
           size_t key_count = 100;
           for (size_t i = 0; i < rows; ++i) {
               // key count cycles between 1, 2, 3, ... for better coverage
               std::string json = "{";
               for (size_t k = 0; k < key_count; ++k) {
   ```



##########
be/src/vec/functions/cast/cast_to_variant.h:
##########
@@ -20,84 +20,100 @@
 #include "cast_base.h"
 #include "cast_to_string.h"
 #include "vec/data_types/data_type_variant.h"
+
 namespace doris::vectorized::CastWrapper {
 
-struct CastFromVariant {
-    static Status execute(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
-                          uint32_t result, size_t input_rows_count,
-                          const NullMap::value_type* null_map = nullptr) {
-        auto& data_type_to = block.get_by_position(result).type;
-        const auto& col_with_type_and_name = 
block.get_by_position(arguments[0]);
-        const auto& col_from = col_with_type_and_name.column;
-        const auto& variant = assert_cast<const ColumnVariant&>(*col_from);
-        ColumnPtr col_to = data_type_to->create_column();
-        if (!variant.is_finalized()) {
-            // ColumnVariant should be finalized before parsing, finalize 
maybe modify original column structure
-            variant.assume_mutable()->finalize();
-        }
-        // It's important to convert as many elements as possible in this 
context. For instance,
-        // if the root of this variant column is a number column, converting 
it to a number column
-        // is acceptable. However, if the destination type is a string and 
root is none scalar root, then
-        // we should convert the entire tree to a string.
-        bool is_root_valuable = variant.is_scalar_variant() ||
-                                (!variant.is_null_root() &&
-                                 variant.get_root_type()->get_primitive_type() 
!= INVALID_TYPE &&
-                                 
!is_string_type(data_type_to->get_primitive_type()) &&
-                                 data_type_to->get_primitive_type() != 
TYPE_JSONB);
-        if (is_root_valuable) {
-            ColumnPtr nested = variant.get_root();
-            auto nested_from_type = variant.get_root_type();
-            // DCHECK(nested_from_type->is_nullable());
-            DCHECK(!data_type_to->is_nullable());
-            auto new_context = context->clone();
+// shared implementation for casting from variant to arbitrary non-nullable 
target type
+inline Status cast_from_variant_impl(FunctionContext* context, Block& block,
+                                     const ColumnNumbers& arguments, uint32_t 
result,
+                                     size_t input_rows_count,
+                                     const NullMap::value_type* /*null_map*/,
+                                     const DataTypePtr& data_type_to) {
+    const auto& col_with_type_and_name = block.get_by_position(arguments[0]);
+    const auto& col_from = col_with_type_and_name.column;
+    const auto& variant = assert_cast<const ColumnVariant&>(*col_from);
+    ColumnPtr col_to = data_type_to->create_column();
+
+    if (!variant.is_finalized()) {
+        // ColumnVariant should be finalized before parsing, finalize maybe 
modify original column structure
+        variant.assume_mutable()->finalize();
+    }
+
+    // It's important to convert as many elements as possible in this context. 
For instance,
+    // if the root of this variant column is a number column, converting it to 
a number column
+    // is acceptable. However, if the destination type is a string and root is 
none scalar root, then
+    // we should convert the entire tree to a string.
+    bool is_root_valuable = variant.is_scalar_variant() ||
+                            (!variant.is_null_root() &&
+                             variant.get_root_type()->get_primitive_type() != 
INVALID_TYPE &&
+                             
!is_string_type(data_type_to->get_primitive_type()) &&
+                             data_type_to->get_primitive_type() != TYPE_JSONB);
+
+    if (is_root_valuable) {
+        ColumnPtr nested = variant.get_root();
+        auto nested_from_type = variant.get_root_type();
+        // DCHECK(nested_from_type->is_nullable());
+        DCHECK(!data_type_to->is_nullable());
+        auto new_context = context == nullptr ? nullptr : context->clone();

Review Comment:
   The context clone operation may fail but is not checked. The code only 
checks if new_context is not nullptr, but even if context is non-null, clone() 
could return nullptr. Consider checking for clone failure explicitly or 
document why it's safe to skip the check.
   ```suggestion
           auto new_context = context == nullptr ? nullptr : context->clone();
           if (context != nullptr && new_context == nullptr) {
               return Status::InternalError("Failed to clone FunctionContext in 
cast_from_variant_impl");
           }
   ```



##########
be/src/vec/exec/format/native/native_reader.cpp:
##########
@@ -0,0 +1,369 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/native/native_reader.h"
+
+#include <gen_cpp/data.pb.h>
+
+#include "io/file_factory.h"
+#include "io/fs/buffered_reader.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/tracing_file_reader.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/exec/format/native/native_format.h"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+NativeReader::NativeReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
+                           const TFileRangeDesc& range, io::IOContext* io_ctx, 
RuntimeState* state)
+        : _profile(profile),
+          _scan_params(params),
+          _scan_range(range),
+          _io_ctx(io_ctx),
+          _state(state) {}
+
+NativeReader::~NativeReader() {
+    (void)close();
+}
+
+namespace {
+
+Status validate_and_consume_header(io::FileReaderSPtr file_reader, const 
TFileRangeDesc& range,
+                                   int64_t* file_size, int64_t* 
current_offset, bool* eof) {
+    *file_size = file_reader->size();
+    *current_offset = 0;
+    *eof = (*file_size == 0);
+
+    // Validate and consume Doris Native file header.
+    // Expected layout:
+    // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t 
block_size]...
+    static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + 
sizeof(uint32_t);
+    if (*eof || *file_size < static_cast<int64_t>(HEADER_SIZE)) {
+        return Status::InternalError(
+                "invalid Doris Native file {}, file size {} is smaller than 
header size {}",
+                range.path, *file_size, HEADER_SIZE);
+    }
+
+    char header[HEADER_SIZE];
+    Slice header_slice(header, sizeof(header));
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read));
+    if (bytes_read != sizeof(header)) {
+        return Status::InternalError(
+                "failed to read Doris Native header from file {}, expect {} 
bytes, got {} bytes",
+                range.path, sizeof(header), bytes_read);
+    }
+
+    if (memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) {
+        return Status::InternalError("invalid Doris Native magic header in 
file {}", range.path);
+    }
+
+    uint32_t version = 0;
+    memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t));
+    if (version != DORIS_NATIVE_FORMAT_VERSION) {
+        return Status::InternalError(
+                "unsupported Doris Native format version {} in file {}, expect 
{}", version,
+                range.path, DORIS_NATIVE_FORMAT_VERSION);
+    }
+
+    *current_offset = sizeof(header);
+    *eof = (*file_size == *current_offset);
+    return Status::OK();
+}
+
+} // namespace
+
+Status NativeReader::init_reader() {
+    if (_file_reader != nullptr) {
+        return Status::OK();
+    }
+
+    // Create underlying file reader. For now we always use random access mode.
+    io::FileSystemProperties system_properties;
+    io::FileDescription file_description;
+    file_description.file_size = -1;
+    if (_scan_range.__isset.file_size) {
+        file_description.file_size = _scan_range.file_size;
+    }
+    file_description.path = _scan_range.path;
+    if (_scan_range.__isset.fs_name) {
+        file_description.fs_name = _scan_range.fs_name;
+    }
+    if (_scan_range.__isset.modification_time) {
+        file_description.mtime = _scan_range.modification_time;
+    } else {
+        file_description.mtime = 0;
+    }
+
+    if (_scan_range.__isset.file_type) {
+        // For compatibility with older FE.
+        system_properties.system_type = _scan_range.file_type;
+    } else {
+        system_properties.system_type = _scan_params.file_type;
+    }
+    system_properties.properties = _scan_params.properties;
+    system_properties.hdfs_params = _scan_params.hdfs_params;
+    if (_scan_params.__isset.broker_addresses) {
+        
system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(),
+                                                  
_scan_params.broker_addresses.end());
+    }
+
+    io::FileReaderOptions reader_options =
+            FileFactory::get_reader_options(_state, file_description);
+    auto reader_res = io::DelegateReader::create_file_reader(
+            _profile, system_properties, file_description, reader_options,
+            io::DelegateReader::AccessMode::RANDOM, _io_ctx);
+    if (!reader_res.has_value()) {
+        return reader_res.error();
+    }
+    _file_reader = reader_res.value();
+
+    if (_io_ctx) {
+        _file_reader =
+                std::make_shared<io::TracingFileReader>(_file_reader, 
_io_ctx->file_reader_stats);
+    }
+
+    RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range, 
&_file_size,
+                                                &_current_offset, &_eof));
+    return Status::OK();
+}
+
+Status NativeReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
+    if (_eof) {
+        *read_rows = 0;
+        *eof = true;
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(init_reader());
+
+    std::string buff;
+    bool local_eof = false;
+
+    // If we have already loaded the first block for schema probing, use it 
first.
+    if (_first_block_loaded && !_first_block_consumed) {
+        buff = _first_block_buf;
+        local_eof = false;
+    } else {
+        RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof));
+    }
+
+    // If we reach EOF and also read no data for this call, the whole file is 
considered finished.
+    if (local_eof && buff.empty()) {
+        *read_rows = 0;
+        *eof = true;
+        _eof = true;
+        return Status::OK();
+    }
+    // If buffer is empty but we have not reached EOF yet, treat this as an 
error.
+    if (buff.empty()) {
+        return Status::InternalError("read empty native block from file {}", 
_scan_range.path);
+    }
+
+    PBlock pblock;
+    if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) {
+        return Status::InternalError("Failed to parse native PBlock from file 
{}",
+                                     _scan_range.path);
+    }
+
+    // Initialize schema from first block if not done yet.
+    if (!_schema_inited) {
+        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+    }
+
+    size_t uncompressed_bytes = 0;
+    int64_t decompress_time = 0;
+    RETURN_IF_ERROR(block->deserialize(pblock, &uncompressed_bytes, 
&decompress_time));
+
+    // For external file scan / TVF scenarios, unify all columns as nullable 
to match
+    // GenericReader/SlotDescriptor convention. This ensures schema 
consistency when
+    // some writers emit non-nullable columns.
+    for (size_t i = 0; i < block->columns(); ++i) {
+        auto& col_with_type = block->get_by_position(i);
+        if (!col_with_type.type->is_nullable()) {
+            col_with_type.column = make_nullable(col_with_type.column);
+            col_with_type.type = make_nullable(col_with_type.type);
+        }
+    }
+
+    *read_rows = block->rows();
+    *eof = false;
+
+    if (_first_block_loaded && !_first_block_consumed) {
+        _first_block_consumed = true;
+    }
+
+    // If we reached the physical end of file, mark eof for subsequent calls.
+    if (_current_offset >= _file_size) {
+        _eof = true;
+    }
+
+    return Status::OK();
+}
+
+Status NativeReader::get_columns(std::unordered_map<std::string, DataTypePtr>* 
name_to_type,
+                                 std::unordered_set<std::string>* 
missing_cols) {
+    missing_cols->clear();
+    RETURN_IF_ERROR(init_reader());
+
+    if (!_schema_inited) {
+        // Load first block lazily to initialize schema.
+        if (!_first_block_loaded) {
+            bool local_eof = false;
+            RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
+            // Treat file as empty only if we reach EOF and there is no block 
data at all.
+            if (local_eof && _first_block_buf.empty()) {
+                return Status::EndOfFile("empty native file {}", 
_scan_range.path);
+            }
+            // Non-EOF but empty buffer means corrupted native file.
+            if (_first_block_buf.empty()) {
+                return Status::InternalError("first native block is empty {}", 
_scan_range.path);
+            }
+            _first_block_loaded = true;
+        }
+
+        PBlock pblock;
+        if (!pblock.ParseFromArray(_first_block_buf.data(),
+                                   static_cast<int>(_first_block_buf.size()))) 
{
+            return Status::InternalError("Failed to parse native PBlock for 
schema from file {}",
+                                         _scan_range.path);
+        }
+        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+    }
+
+    for (size_t i = 0; i < _schema_col_names.size(); ++i) {
+        name_to_type->emplace(_schema_col_names[i], _schema_col_types[i]);
+    }
+    return Status::OK();
+}
+
+Status NativeReader::init_schema_reader() {
+    RETURN_IF_ERROR(init_reader());
+    return Status::OK();
+}
+
+Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names,
+                                       std::vector<DataTypePtr>* col_types) {
+    RETURN_IF_ERROR(init_reader());
+
+    if (!_schema_inited) {
+        if (!_first_block_loaded) {
+            bool local_eof = false;
+            RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
+            // Treat file as empty only if we reach EOF and there is no block 
data at all.
+            if (local_eof && _first_block_buf.empty()) {
+                return Status::EndOfFile("empty native file {}", 
_scan_range.path);
+            }
+            // Non-EOF but empty buffer means corrupted native file.
+            if (_first_block_buf.empty()) {
+                return Status::InternalError("first native block is empty {}", 
_scan_range.path);
+            }
+            _first_block_loaded = true;
+        }
+
+        PBlock pblock;
+        if (!pblock.ParseFromArray(_first_block_buf.data(),
+                                   static_cast<int>(_first_block_buf.size()))) 
{
+            return Status::InternalError("Failed to parse native PBlock for 
schema from file {}",
+                                         _scan_range.path);
+        }
+        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+    }
+
+    *col_names = _schema_col_names;
+    *col_types = _schema_col_types;
+    return Status::OK();
+}
+
+Status NativeReader::close() {
+    _file_reader.reset();
+    return Status::OK();
+}
+
+Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) {
+    *eof = false;
+    buff->clear();
+
+    if (_file_reader == nullptr) {
+        RETURN_IF_ERROR(init_reader());
+    }
+
+    if (_current_offset >= _file_size) {
+        *eof = true;
+        return Status::OK();
+    }
+
+    uint64_t len = 0;
+    Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice, 
&bytes_read));
+    if (bytes_read == 0) {
+        *eof = true;
+        return Status::OK();
+    }
+    if (bytes_read != sizeof(len)) {
+        return Status::InternalError(
+                "Failed to read native block length from file {}, expect {}, "
+                "actual {}",
+                _scan_range.path, sizeof(len), bytes_read);
+    }
+
+    _current_offset += sizeof(len);
+    if (len == 0) {
+        // Empty block, nothing to read.
+        *eof = (_current_offset >= _file_size);
+        return Status::OK();
+    }
+
+    buff->assign(len, '\0');
+    Slice data_slice(buff->data(), len);
+    bytes_read = 0;
+    RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, 
&bytes_read));
+    if (bytes_read != len) {
+        return Status::InternalError(
+                "Failed to read native block body from file {}, expect {}, "
+                "actual {}",
+                _scan_range.path, len, bytes_read);
+    }
+
+    _current_offset += len;
+    *eof = (_current_offset >= _file_size);
+    return Status::OK();
+}
+
+Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) {
+    _schema_col_names.clear();
+    _schema_col_types.clear();
+
+    for (const auto& pcol_meta : pblock.column_metas()) {
+        DataTypePtr type = 
make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta));
+        LOG(INFO) << "init_schema_from_pblock, name=" << pcol_meta.name()
+                  << ", type=" << type->get_name();

Review Comment:
   Using LOG(INFO) for schema information can pollute production logs, 
especially if this file is read frequently. Consider using VLOG with an 
appropriate verbosity level instead, or remove this log statement if it's only 
needed for debugging.
   ```suggestion
           VLOG(1) << "init_schema_from_pblock, name=" << pcol_meta.name()
                   << ", type=" << type->get_name();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to