Copilot commented on code in PR #61286: URL: https://github.com/apache/doris/pull/61286#discussion_r2924295433
########## be/src/vec/exec/format/native/native_format.h: ########## @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> + +namespace doris::vectorized { + +// Doris Native format file-level constants. +// +// File layout (byte stream): +// +// +-------------------------------+---------------------------+---------------------------+ ... +// | File header | Data block #0 | Data block #1 | ... +// +-------------------------------+---------------------------+---------------------------+ ... +// +// File header (12 bytes total): +// - [0..7] : magic bytes "DORISN1\0" (DORIS_NATIVE_MAGIC) +// - [8..11] : uint32_t format_version (DORIS_NATIVE_FORMAT_VERSION, little-endian) +// +// Each data block i: +// - uint64_t block_size : length in bytes of serialized PBlock (little-endian) +// - uint8_t[block_size] : PBlock protobuf payload produced by Block::serialize() +// +// NativeReader: +// - Detects the optional file header by checking the first 8 bytes against DORIS_NATIVE_MAGIC. +// - If the header is present, it skips 12 bytes and then starts reading blocks as +// [uint64_t block_size][PBlock bytes]... +// - If the header is absent (legacy files), it starts reading blocks from offset 0. Review Comment: The documentation says the header is optional and NativeReader can read legacy files without it, but `validate_and_consume_header()` currently *requires* the magic header and errors if it is absent. Either implement the documented backward-compatible behavior (detect header presence and fall back to offset 0) or update this comment to match the actual requirement. ```suggestion // - Expects a file header at the beginning of the file and verifies the first 8 bytes // against DORIS_NATIVE_MAGIC. // - After validating the header, it skips 12 bytes and then starts reading blocks as // [uint64_t block_size][PBlock bytes]... ``` ########## be/src/vec/exec/scan/file_scanner.cpp: ########## @@ -617,8 +614,10 @@ Status FileScanner::_cast_to_input_block(Block* block) { return_type->get_name()); } idx = _src_block_name_to_idx[slot_desc->col_name()]; + DCHECK(_state != nullptr); + auto ctx = FunctionContext::create_context(_state, {}, {}); RETURN_IF_ERROR( - func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size())); + func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size())); Review Comment: A new `FunctionContext` is created for every column cast inside the loop. This can add overhead for wide schemas (and native/arrow explicitly targets real typed columns). Consider creating a single `FunctionContext` once per `_cast_to_input_block` invocation and reusing it for all casts in the loop, unless specific casts require isolated contexts. ########## regression-test/suites/export_p0/test_export_variant_10k_columns.groovy: ########## @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.io.File +import org.awaitility.Awaitility +import static java.util.concurrent.TimeUnit.SECONDS; + +suite("test_export_variant_10k_columns", "p0") { + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + // String bucket = context.config.otherConfigs.get("s3BucketName"); + String bucket = getS3BucketName() + + def table_export_name = "test_export_variant_10k" + def table_load_name = "test_load_variant_10k" + def outfile_path_prefix = """${bucket}/export/p0/variant_10k/exp""" + + def waiting_export = { export_label -> + while (true) { + def res = sql """ show export where label = "${export_label}" """ + logger.info("export state: " + res[0][2]) + if (res[0][2] == "FINISHED") { + def json = parseJson(res[0][11]) + assert json instanceof List + // assertEquals("1", json.fileNumber[0][0]) + log.info("outfile_path: ${json.url[0][0]}") + return json.url[0][0]; + } else if (res[0][2] == "CANCELLED") { + throw new IllegalStateException("""export failed: ${res[0][10]}""") + } else { + sleep(5000) + } + } + } + + // 1. Create table with variant column + sql """ DROP TABLE IF EXISTS ${table_export_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_export_name} ( + `id` INT NOT NULL, + `v` VARIANT<PROPERTIES ("variant_max_subcolumns_count" = "2048")> NULL + ) + DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + """ + + // 2. Generate data with 10000 keys in variant + // Generate N=100,000 rows. + // Total 10,000 columns, but each row only has 50 columns (sparse). + // This simulates a realistic sparse wide table scenario. + + File dataFile = File.createTempFile("variant_10k_data", ".json") + dataFile.deleteOnExit() + int num_rows = 1000 + try { + dataFile.withWriter { writer -> + StringBuilder sb = new StringBuilder() + for (int i = 1; i <= num_rows; i++) { + sb.setLength(0) + sb.append("{\"id\": ").append(i).append(", \"v\": {") + // Select 50 keys out of 10000 for each row + for (int k = 0; k < 50; k++) { + if (k > 0) sb.append(", ") + // Scatter the keys to ensure coverage of all 10000 columns across rows + int keyIdx = (i + k * 200) % 10000 + sb.append('"k').append(keyIdx).append('":').append(i) + } + sb.append("}}\n") + writer.write(sb.toString()) + } + } + + // 3. Stream Load + streamLoad { + table table_export_name + set 'format', 'json' + set 'read_json_by_line', 'true' + file dataFile.getAbsolutePath() + time 60000 // 60s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(num_rows, json.NumberTotalRows) + assertEquals(num_rows, json.NumberLoadedRows) + } + } + } finally { + dataFile.delete() + } + + // def format = "parquet" + def format = "native" + + // 4. Export to S3 (Parquet) + def uuid = UUID.randomUUID().toString() + // def outFilePath = """/tmp/variant_10k_export""" + def outFilePath = """${outfile_path_prefix}_${uuid}""" + def label = "label_${uuid}" + + try { + sql """ + EXPORT TABLE ${table_export_name} TO "s3://${outFilePath}/" + PROPERTIES( + "label" = "${label}", + "format" = "${format}" + ) + WITH S3( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}", + "provider" = "${getS3Provider()}" + ); + """ + + long startExport = System.currentTimeMillis() + def outfile_url = waiting_export.call(label) + long endExport = System.currentTimeMillis() + logger.info("Export ${num_rows} rows with variant took ${endExport - startExport} ms") + + // 5. Validate by S3 TVF + def s3_tvf_sql = """ s3( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "s3.access_key"= "${ak}", + "s3.secret_key" = "${sk}", + "format" = "${format}", + "provider" = "${getS3Provider()}", + "region" = "${region}" + ) """ + + // Verify row count + def res = sql """ SELECT count(*) FROM ${s3_tvf_sql} """ + assertEquals(num_rows, res[0][0]) + + def value_type = "VARIANT<PROPERTIES (\"variant_max_subcolumns_count\" = \"2048\")>" + if (new Random().nextInt(2) == 0) { + value_type = "text" + } Review Comment: This introduces randomness into a P0 regression test, which can make CI runs flaky and non-reproducible (schema differs between runs). Make the choice deterministic (e.g., run both cases explicitly in separate steps / separate suites, or base it on a fixed seed) so failures are reproducible. ########## regression-test/suites/export_p0/test_export_variant_10k_columns.groovy: ########## @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.io.File +import org.awaitility.Awaitility +import static java.util.concurrent.TimeUnit.SECONDS; + +suite("test_export_variant_10k_columns", "p0") { + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + // String bucket = context.config.otherConfigs.get("s3BucketName"); + String bucket = getS3BucketName() + + def table_export_name = "test_export_variant_10k" + def table_load_name = "test_load_variant_10k" + def outfile_path_prefix = """${bucket}/export/p0/variant_10k/exp""" + + def waiting_export = { export_label -> + while (true) { + def res = sql """ show export where label = "${export_label}" """ + logger.info("export state: " + res[0][2]) + if (res[0][2] == "FINISHED") { + def json = parseJson(res[0][11]) + assert json instanceof List + // assertEquals("1", json.fileNumber[0][0]) + log.info("outfile_path: ${json.url[0][0]}") + return json.url[0][0]; + } else if (res[0][2] == "CANCELLED") { + throw new IllegalStateException("""export failed: ${res[0][10]}""") + } else { + sleep(5000) + } + } + } Review Comment: The export wait loop has no timeout, so a stuck export can hang the suite indefinitely. Add a maximum wait time (or max iterations) and fail with a clear message when exceeded; alternatively reuse Awaitility here (similar to the load wait later) for consistent timeouts. ########## be/src/vec/exec/format/native/native_reader.cpp: ########## @@ -0,0 +1,369 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/native/native_reader.h" + +#include <gen_cpp/data.pb.h> + +#include "io/file_factory.h" +#include "io/fs/buffered_reader.h" +#include "io/fs/file_reader.h" +#include "io/fs/tracing_file_reader.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/exec/format/native/native_format.h" + +namespace doris::vectorized { + +#include "common/compile_check_begin.h" + +NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx, RuntimeState* state) + : _profile(profile), + _scan_params(params), + _scan_range(range), + _io_ctx(io_ctx), + _state(state) {} + +NativeReader::~NativeReader() { + (void)close(); +} + +namespace { + +Status validate_and_consume_header(io::FileReaderSPtr file_reader, const TFileRangeDesc& range, + int64_t* file_size, int64_t* current_offset, bool* eof) { + *file_size = file_reader->size(); + *current_offset = 0; + *eof = (*file_size == 0); + + // Validate and consume Doris Native file header. + // Expected layout: + // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t block_size]... + static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t); + if (*eof || *file_size < static_cast<int64_t>(HEADER_SIZE)) { + return Status::InternalError( + "invalid Doris Native file {}, file size {} is smaller than header size {}", + range.path, *file_size, HEADER_SIZE); + } + + char header[HEADER_SIZE]; + Slice header_slice(header, sizeof(header)); + size_t bytes_read = 0; + RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read)); + if (bytes_read != sizeof(header)) { + return Status::InternalError( + "failed to read Doris Native header from file {}, expect {} bytes, got {} bytes", + range.path, sizeof(header), bytes_read); + } + + if (memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) { + return Status::InternalError("invalid Doris Native magic header in file {}", range.path); + } + + uint32_t version = 0; + memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t)); + if (version != DORIS_NATIVE_FORMAT_VERSION) { + return Status::InternalError( + "unsupported Doris Native format version {} in file {}, expect {}", version, + range.path, DORIS_NATIVE_FORMAT_VERSION); + } + + *current_offset = sizeof(header); + *eof = (*file_size == *current_offset); + return Status::OK(); +} + +} // namespace + +Status NativeReader::init_reader() { + if (_file_reader != nullptr) { + return Status::OK(); + } + + // Create underlying file reader. For now we always use random access mode. + io::FileSystemProperties system_properties; + io::FileDescription file_description; + file_description.file_size = -1; + if (_scan_range.__isset.file_size) { + file_description.file_size = _scan_range.file_size; + } + file_description.path = _scan_range.path; + if (_scan_range.__isset.fs_name) { + file_description.fs_name = _scan_range.fs_name; + } + if (_scan_range.__isset.modification_time) { + file_description.mtime = _scan_range.modification_time; + } else { + file_description.mtime = 0; + } + + if (_scan_range.__isset.file_type) { + // For compatibility with older FE. + system_properties.system_type = _scan_range.file_type; + } else { + system_properties.system_type = _scan_params.file_type; + } + system_properties.properties = _scan_params.properties; + system_properties.hdfs_params = _scan_params.hdfs_params; + if (_scan_params.__isset.broker_addresses) { + system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(), + _scan_params.broker_addresses.end()); + } + + io::FileReaderOptions reader_options = + FileFactory::get_reader_options(_state, file_description); + auto reader_res = io::DelegateReader::create_file_reader( + _profile, system_properties, file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, _io_ctx); + if (!reader_res.has_value()) { + return reader_res.error(); + } + _file_reader = reader_res.value(); + + if (_io_ctx) { + _file_reader = + std::make_shared<io::TracingFileReader>(_file_reader, _io_ctx->file_reader_stats); + } + + RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range, &_file_size, + &_current_offset, &_eof)); + return Status::OK(); +} + +Status NativeReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_eof) { + *read_rows = 0; + *eof = true; + return Status::OK(); + } + + RETURN_IF_ERROR(init_reader()); + + std::string buff; + bool local_eof = false; + + // If we have already loaded the first block for schema probing, use it first. + if (_first_block_loaded && !_first_block_consumed) { + buff = _first_block_buf; + local_eof = false; + } else { + RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof)); + } + + // If we reach EOF and also read no data for this call, the whole file is considered finished. + if (local_eof && buff.empty()) { + *read_rows = 0; + *eof = true; + _eof = true; + return Status::OK(); + } + // If buffer is empty but we have not reached EOF yet, treat this as an error. + if (buff.empty()) { + return Status::InternalError("read empty native block from file {}", _scan_range.path); + } + + PBlock pblock; + if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) { + return Status::InternalError("Failed to parse native PBlock from file {}", + _scan_range.path); + } + + // Initialize schema from first block if not done yet. + if (!_schema_inited) { + RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); + } + + size_t uncompressed_bytes = 0; + int64_t decompress_time = 0; + RETURN_IF_ERROR(block->deserialize(pblock, &uncompressed_bytes, &decompress_time)); + + // For external file scan / TVF scenarios, unify all columns as nullable to match + // GenericReader/SlotDescriptor convention. This ensures schema consistency when + // some writers emit non-nullable columns. + for (size_t i = 0; i < block->columns(); ++i) { + auto& col_with_type = block->get_by_position(i); + if (!col_with_type.type->is_nullable()) { + col_with_type.column = make_nullable(col_with_type.column); + col_with_type.type = make_nullable(col_with_type.type); + } + } + + *read_rows = block->rows(); + *eof = false; + + if (_first_block_loaded && !_first_block_consumed) { + _first_block_consumed = true; + } + + // If we reached the physical end of file, mark eof for subsequent calls. + if (_current_offset >= _file_size) { + _eof = true; + } + + return Status::OK(); +} + +Status NativeReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, + std::unordered_set<std::string>* missing_cols) { + missing_cols->clear(); + RETURN_IF_ERROR(init_reader()); + + if (!_schema_inited) { + // Load first block lazily to initialize schema. + if (!_first_block_loaded) { + bool local_eof = false; + RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof)); + // Treat file as empty only if we reach EOF and there is no block data at all. + if (local_eof && _first_block_buf.empty()) { + return Status::EndOfFile("empty native file {}", _scan_range.path); + } + // Non-EOF but empty buffer means corrupted native file. + if (_first_block_buf.empty()) { + return Status::InternalError("first native block is empty {}", _scan_range.path); + } + _first_block_loaded = true; + } + + PBlock pblock; + if (!pblock.ParseFromArray(_first_block_buf.data(), + static_cast<int>(_first_block_buf.size()))) { + return Status::InternalError("Failed to parse native PBlock for schema from file {}", + _scan_range.path); + } + RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); + } + + for (size_t i = 0; i < _schema_col_names.size(); ++i) { + name_to_type->emplace(_schema_col_names[i], _schema_col_types[i]); + } + return Status::OK(); +} + +Status NativeReader::init_schema_reader() { + RETURN_IF_ERROR(init_reader()); + return Status::OK(); +} + +Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names, + std::vector<DataTypePtr>* col_types) { + RETURN_IF_ERROR(init_reader()); + + if (!_schema_inited) { + if (!_first_block_loaded) { + bool local_eof = false; + RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof)); + // Treat file as empty only if we reach EOF and there is no block data at all. + if (local_eof && _first_block_buf.empty()) { + return Status::EndOfFile("empty native file {}", _scan_range.path); + } + // Non-EOF but empty buffer means corrupted native file. + if (_first_block_buf.empty()) { + return Status::InternalError("first native block is empty {}", _scan_range.path); + } + _first_block_loaded = true; + } + + PBlock pblock; + if (!pblock.ParseFromArray(_first_block_buf.data(), + static_cast<int>(_first_block_buf.size()))) { + return Status::InternalError("Failed to parse native PBlock for schema from file {}", + _scan_range.path); + } + RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); + } + + *col_names = _schema_col_names; + *col_types = _schema_col_types; + return Status::OK(); +} + +Status NativeReader::close() { + _file_reader.reset(); + return Status::OK(); +} + +Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) { + *eof = false; + buff->clear(); + + if (_file_reader == nullptr) { + RETURN_IF_ERROR(init_reader()); + } + + if (_current_offset >= _file_size) { + *eof = true; + return Status::OK(); + } + + uint64_t len = 0; + Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len)); + size_t bytes_read = 0; + RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice, &bytes_read)); + if (bytes_read == 0) { + *eof = true; + return Status::OK(); + } + if (bytes_read != sizeof(len)) { + return Status::InternalError( + "Failed to read native block length from file {}, expect {}, " + "actual {}", + _scan_range.path, sizeof(len), bytes_read); + } + + _current_offset += sizeof(len); + if (len == 0) { + // Empty block, nothing to read. + *eof = (_current_offset >= _file_size); + return Status::OK(); + } + + buff->assign(len, '\0'); + Slice data_slice(buff->data(), len); + bytes_read = 0; + RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, &bytes_read)); + if (bytes_read != len) { + return Status::InternalError( + "Failed to read native block body from file {}, expect {}, " + "actual {}", + _scan_range.path, len, bytes_read); + } Review Comment: NativeReader allocates `len` bytes directly from the file-provided block length without validating it against remaining file size (or any upper bound). A malformed/corrupt file can trigger huge allocations (OOM) before `read_at` fails. Add a validation before `buff->assign(...)`, e.g. ensure `len <= (_file_size - _current_offset)` and optionally enforce a reasonable maximum native block size to protect BE. ########## regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods Review Comment: This import is not used anywhere in the file. Removing it reduces noise and avoids misleading readers about expected I/O helpers. ```suggestion ``` ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java: ########## @@ -391,6 +393,16 @@ private Pair<Type, Integer> getColumnType(List<PTypeNode> typeNodes, int start) parsedNodes += fieldType.value(); } type = new StructType(fields); + } else if (tPrimitiveType == TPrimitiveType.VARIANT) { + // Preserve VARIANT-specific properties from PTypeNode, especially variant_max_subcolumns_count. + int maxSubcolumns = typeNode.getVariantMaxSubcolumnsCount(); + // Currently no predefined fields are carried in PTypeNode for VARIANT, so use empty list and default + // values for other properties. + type = new VariantType(new ArrayList<>(), maxSubcolumns, + /*enableTypedPathsToSparse*/ false, + /*variantMaxSparseColumnStatisticsSize*/ 10000, + /*variantSparseHashShardCount*/ 0); Review Comment: This constructs `VariantType` with several hard-coded magic values (e.g. `variantMaxSparseColumnStatisticsSize = 10000`, shard count `0`). If defaults/configs change elsewhere, ExternalFileTVF may diverge from normal VARIANT semantics. Prefer using a centralized factory/default-builder (or Config-backed defaults) so only `variant_max_subcolumns_count` is overridden here, and other properties stay consistent with the rest of FE/BE. ```suggestion // Currently no predefined fields are carried in PTypeNode for VARIANT, so use empty list and // Config-backed defaults for other properties to keep semantics consistent across FE/BE. type = new VariantType(new ArrayList<>(), maxSubcolumns, Config.enable_typed_paths_to_sparse, Config.variant_max_sparse_column_statistics_size, Config.variant_sparse_hash_shard_count); ``` ########## regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_outfile_native", "p0") { + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + def tableName = "outfile_native_test" + def outFilePath = "${bucket}/outfile/native/exp_" + + // Export helper: write to S3 and return the URL output by FE + def outfile_to_s3 = { + def res = sql """ + SELECT * FROM ${tableName} t ORDER BY id + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS native + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + return res[0][3] + } + + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `id` INT NOT NULL, + `c_date` DATE NOT NULL, + `c_dt` DATETIME NOT NULL, + `c_str` VARCHAR(20), + `c_int` INT, + `c_tinyint` TINYINT, + `c_bool` boolean, + `c_double` double + ) + DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + """ + + // Insert 10 rows of test data (the last row is all NULL) + StringBuilder sb = new StringBuilder() + int i = 1 + for (; i < 10000; i ++) { + sb.append(""" + (${i}, '2024-01-01', '2024-01-01 00:00:00', 's${i}', ${i}, ${i % 128}, true, ${i}.${i}), + """) + } + sb.append(""" + (${i}, '2024-01-01', '2024-01-01 00:00:00', NULL, NULL, NULL, NULL, NULL) + """) + sql """ INSERT INTO ${tableName} VALUES ${sb.toString()} """ Review Comment: The comment says 'Insert 10 rows', but the loop inserts 9,999 rows plus a final row (10,000 total). Update the comment to match the actual data volume so future readers understand the test’s intent and runtime characteristics. ########## be/src/service/internal_service.cpp: ########## @@ -856,6 +857,11 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr reader = vectorized::OrcReader::create_unique(params, range, "", &io_ctx); break; } + case TFileFormatType::FORMAT_NATIVE: { + reader = vectorized::NativeReader::create_unique(profile.get(), params, range, &io_ctx, + nullptr); Review Comment: `NativeReader` is constructed with `RuntimeState* state = nullptr` here, but `NativeReader::init_reader()` passes `_state` into `FileFactory::get_reader_options(_state, ...)`. If `get_reader_options` (or other downstream logic) assumes non-null, this will crash during schema fetch. Pass a valid RuntimeState (if available in this RPC path) or update `NativeReader`/FileFactory usage to safely handle a null state for schema-only operations. ```suggestion RuntimeState dummy_state; reader = vectorized::NativeReader::create_unique(profile.get(), params, range, &io_ctx, &dummy_state); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
