This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit f47edb9318fc12cab9662439396e663fe661541b Author: Mingyu Chen <[email protected]> AuthorDate: Tue Feb 21 14:14:32 2023 +0800 [fix](tvf) fix bug that failed to get schema of tvf when file is empty (#16928) In previous implementation, when querying tvf, FE will get schema from BE. And BE will try to open the first file to get its schema info, but for orc or parquet format, if the file is empty, it will return error. But even for an empty file, we can still get schema info from file's footer. So we should handle the empty file to get schema info correctly. Also modify the catalog doc to add some FAQ. --- be/src/service/internal_service.cpp | 6 +++--- be/src/vec/exec/format/csv/csv_reader.cpp | 4 ++-- be/src/vec/exec/format/orc/vorc_reader.cpp | 11 ++++------- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 8 ++------ be/src/vec/exec/scan/vfile_scanner.cpp | 4 ++-- 5 files changed, 13 insertions(+), 20 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 5922fe6419..aa3510452d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -384,7 +384,8 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl // this will influence query execution, because the pthreads under bthread may be // exhausted, so we put this to a local thread pool to process int64_t submit_task_time_ns = MonotonicNanos(); - bool ret = _heavy_work_pool.offer([cntl_base, request, response, done, submit_task_time_ns, this]() { + bool ret = _heavy_work_pool.offer([cntl_base, request, response, done, submit_task_time_ns, + this]() { int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; @@ -555,8 +556,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c case TFileFormatType::FORMAT_CSV_DEFLATE: { // file_slots is no use std::vector<SlotDescriptor*> file_slots; - reader.reset( - new vectorized::CsvReader(profile.get(), params, range, file_slots)); + reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots)); break; } case TFileFormatType::FORMAT_PARQUET: { diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index c7099b24c7..ce537d93cf 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -115,7 +115,7 @@ Status CsvReader::init_reader(bool is_load) { RETURN_IF_ERROR(real_reader->open()); if (real_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && _params.file_type != TFileType::FILE_BROKER) { - return Status::EndOfFile("Empty File"); + return Status::EndOfFile("init reader failed, empty csv file: " + _range.path); } // get column_separator and line_delimiter @@ -544,7 +544,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { _range.file_size, 0, _file_reader)); RETURN_IF_ERROR(_file_reader->open()); if (_file_reader->size() == 0) { - return Status::EndOfFile("Empty File"); + return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path); } // get column_separator and line_delimiter diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 57e0b4fc89..0840155203 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -142,7 +142,7 @@ Status OrcReader::init_reader( _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader.release()); } if (_file_reader->getLength() == 0) { - return Status::EndOfFile("Empty orc file"); + return Status::EndOfFile("init reader failed, empty orc file: " + _scan_range.path); } // create orc reader @@ -153,7 +153,8 @@ Status OrcReader::init_reader( return Status::InternalError("Init OrcReader failed. reason = {}", e.what()); } if (_reader->getNumberOfRows() == 0) { - return Status::EndOfFile("Empty orc file"); + return Status::EndOfFile("init reader failed, empty orc file with row num 0: " + + _scan_range.path); } // _init_bloom_filter(colname_to_value_range); @@ -197,7 +198,7 @@ Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names, _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader.release()); } if (_file_reader->getLength() == 0) { - return Status::EndOfFile("Empty orc file"); + return Status::EndOfFile("get parsed schema fail, empty orc file: " + _scan_range.path); } // create orc reader @@ -208,10 +209,6 @@ Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names, return Status::InternalError("Init OrcReader failed. reason = {}", e.what()); } - if (_reader->getNumberOfRows() == 0) { - return Status::EndOfFile("Empty orc file"); - } - auto& root_type = _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { col_names->emplace_back(_get_field_name_lower_case(&root_type, i)); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 35302f9306..d873260010 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -147,7 +147,7 @@ Status ParquetReader::_open_file() { if (_file_metadata == nullptr) { RETURN_IF_ERROR(_file_reader->open()); if (_file_reader->size() == 0) { - return Status::EndOfFile("Empty Parquet File"); + return Status::EndOfFile("open file failed, empty parquet file: " + _scan_range.path); } RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata)); } @@ -179,7 +179,7 @@ Status ParquetReader::init_reader( SCOPED_RAW_TIMER(&_statistics.parse_meta_time); _total_groups = _t_metadata->row_groups.size(); if (_total_groups == 0) { - return Status::EndOfFile("Empty Parquet File"); + return Status::EndOfFile("init reader failed, empty parquet file: " + _scan_range.path); } // all_column_names are all the columns required by user sql. // missing_column_names are the columns required by user sql but not in the parquet file, @@ -350,10 +350,6 @@ Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names, _t_metadata = &_file_metadata->to_thrift(); _total_groups = _t_metadata->row_groups.size(); - if (_total_groups == 0) { - return Status::EndOfFile("Empty Parquet File"); - } - auto schema_desc = _file_metadata->schema(); for (int i = 0; i < schema_desc.size(); ++i) { // Get the Column Reader for the boolean column diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 8619719194..58eb496d71 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -494,8 +494,8 @@ Status VFileScanner::_get_next_reader() { IcebergTableReader* iceberg_reader = new IcebergTableReader((GenericReader*)parquet_reader, _profile, _state, _params, range, _kv_cache); - iceberg_reader->init_reader(_file_col_names, _col_id_name_map, - _colname_to_value_range, _push_down_expr); + init_status = iceberg_reader->init_reader(_file_col_names, _col_id_name_map, + _colname_to_value_range, _push_down_expr); RETURN_IF_ERROR(iceberg_reader->init_row_filters(range)); _cur_reader.reset((GenericReader*)iceberg_reader); } else { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
