This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 350095e569a15cfb8c12b253b3230d17f6125417 Author: Mingyu Chen <[email protected]> AuthorDate: Mon Jul 17 13:37:02 2023 +0800 [improvement](catalog) reduce the size thrift params for external table query (#21771) ### 1 In previous implementation, for each FileSplit, there will be a `TFileScanRange`, and each `TFileScanRange` contains a list of `TFileRangeDesc` and a `TFileScanRangeParams`. So if there are thousands of FileSplit, there will be thousands of `TFileScanRange`, which cause the thrift data send to BE too large, resulting in: 1. the rpc of sending fragment may fail due to timeout 2. FE will OOM For a certain query request, the `TFileScanRangeParams` is the common part and is same of all `TFileScanRange`. So I move this to the `TExecPlanFragmentParams`. After that, for each FileSplit, there is only a list of `TFileRangeDesc`. In my test, to query a hive table with 100000 partitions, the size of thrift data reduced from 151MB to 15MB, and the above 2 issues are gone. ### 2 Support when setting `max_external_file_meta_cache_num` <=0, the file meta cache for parquet footer will not be used. Because I found that for some wide table, the footer is too large(1MB after compact, and much more after deserialized to thrift), it will consuming too much memory of BE when there are many files. This will be optimized later, here I just support to disable this cache. --- be/src/olap/push_handler.cpp | 7 +- be/src/runtime/fragment_mgr.cpp | 6 ++ be/src/runtime/query_context.h | 4 ++ be/src/vec/exec/format/csv/csv_reader.cpp | 21 +++++- be/src/vec/exec/format/json/new_json_reader.cpp | 7 +- be/src/vec/exec/format/orc/vorc_reader.cpp | 30 ++++++-- be/src/vec/exec/format/orc/vorc_reader.h | 8 ++- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 7 +- be/src/vec/exec/scan/avro_jni_reader.cpp | 8 ++- be/src/vec/exec/scan/new_file_scan_node.cpp | 11 ++- be/src/vec/exec/scan/vfile_scanner.cpp | 67 ++++++++++-------- be/src/vec/exec/scan/vfile_scanner.h | 2 +- .../doris/planner/external/FileQueryScanNode.java | 82 ++++++++++++---------- .../main/java/org/apache/doris/qe/Coordinator.java | 19 ++++- .../ExternalFileTableValuedFunction.java | 5 +- gensrc/thrift/PaloInternalService.thrift | 5 ++ gensrc/thrift/PlanNodes.thrift | 8 +++ .../hive/test_external_brown.groovy | 2 +- 18 files changed, 211 insertions(+), 88 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 380576fdc1..d852b03814 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -322,7 +322,6 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange& if (0 == _ranges.size()) { return; } - _file_params.file_type = _ranges[0].file_type; _file_params.format_type = _ranges[0].format_type; _file_params.src_tuple_id = _params.src_tuple_id; _file_params.dest_tuple_id = _params.dest_tuple_id; @@ -336,6 +335,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange& for (int i = 0; i < _ranges.size(); ++i) { TFileRangeDesc file_range; + // TODO(cmy): in previous implementation, the file_type is set in _file_params + // and it use _ranges[0].file_type. + // Later, this field is moved to TFileRangeDesc, but here we still only use _ranges[0]'s + // file_type. + // Because I don't know if other range has this field, so just keep it same as before. + file_range.file_type = _ranges[0].file_type; file_range.load_id = _ranges[i].load_id; file_range.path = _ranges[i].path; file_range.start_offset = _ranges[i].start_offset; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index b5fcb933cb..f8e7750151 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -676,6 +676,12 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo query_ctx->query_id = query_id; RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); + + // set file scan range params + if (params.__isset.file_scan_params) { + query_ctx->file_scan_range_params_map = params.file_scan_params; + } + query_ctx->coord_addr = params.coord; LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo) << " coord_addr " << query_ctx->coord_addr diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 8ed0150db5..c7651a687d 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -197,6 +197,10 @@ public: std::vector<TUniqueId> fragment_ids; + // plan node id -> TFileScanRangeParams + // only for file scan node + std::map<int, TFileScanRangeParams> file_scan_range_params_map; + private: ExecEnv* _exec_env; vectorized::VecDateTimeValue _start_time; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 09531a2c2b..55a43ed4c5 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -88,7 +88,12 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte _io_ctx(io_ctx) { _file_format_type = _params.format_type; _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO; - _file_compress_type = _params.compress_type; + if (_range.__isset.compress_type) { + // for compatibility + _file_compress_type = _range.compress_type; + } else { + _file_compress_type = _params.compress_type; + } _size = _range.size; _split_values.reserve(sizeof(Slice) * _file_slot_descs.size()); @@ -110,7 +115,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params _decompressor(nullptr), _io_ctx(io_ctx) { _file_format_type = _params.format_type; - _file_compress_type = _params.compress_type; + if (_range.__isset.compress_type) { + // for compatibility + _file_compress_type = _range.compress_type; + } else { + _file_compress_type = _params.compress_type; + } _size = _range.size; _init_system_properties(); _init_file_description(); @@ -119,7 +129,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params CsvReader::~CsvReader() = default; void CsvReader::_init_system_properties() { - _system_properties.system_type = _params.file_type; + if (_range.__isset.file_type) { + // for compatibility + _system_properties.system_type = _range.file_type; + } else { + _system_properties.system_type = _params.file_type; + } _system_properties.properties = _params.properties; _system_properties.hdfs_params = _params.hdfs_params; if (_params.__isset.broker_addresses) { diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 7d17f650c6..94c376ae69 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -135,7 +135,12 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams } void NewJsonReader::_init_system_properties() { - _system_properties.system_type = _params.file_type; + if (_range.__isset.file_type) { + // for compatibility + _system_properties.system_type = _range.file_type; + } else { + _system_properties.system_type = _params.file_type; + } _system_properties.properties = _params.properties; _system_properties.hdfs_params = _params.hdfs_params; if (_params.__isset.broker_addresses) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 7f87f77590..001d32b821 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -181,7 +181,9 @@ void OrcReader::_collect_profile_on_close() { COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes); COUNTER_UPDATE(_orc_profile.column_read_time, _statistics.column_read_time); COUNTER_UPDATE(_orc_profile.get_batch_time, _statistics.get_batch_time); - COUNTER_UPDATE(_orc_profile.parse_meta_time, _statistics.parse_meta_time); + COUNTER_UPDATE(_orc_profile.create_reader_time, _statistics.create_reader_time); + COUNTER_UPDATE(_orc_profile.init_column_time, _statistics.init_column_time); + COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time); COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time); COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time); } @@ -200,7 +202,11 @@ void OrcReader::_init_profile() { _orc_profile.read_bytes = ADD_COUNTER(_profile, "FileReadBytes", TUnit::BYTES); _orc_profile.column_read_time = ADD_CHILD_TIMER(_profile, "ColumnReadTime", orc_profile); _orc_profile.get_batch_time = ADD_CHILD_TIMER(_profile, "GetBatchTime", orc_profile); - _orc_profile.parse_meta_time = ADD_CHILD_TIMER(_profile, "ParseMetaTime", orc_profile); + _orc_profile.create_reader_time = + ADD_CHILD_TIMER(_profile, "CreateReaderTime", orc_profile); + _orc_profile.init_column_time = ADD_CHILD_TIMER(_profile, "InitColumnTime", orc_profile); + _orc_profile.set_fill_column_time = + ADD_CHILD_TIMER(_profile, "SetFillColumnTime", orc_profile); _orc_profile.decode_value_time = ADD_CHILD_TIMER(_profile, "DecodeValueTime", orc_profile); _orc_profile.decode_null_map_time = ADD_CHILD_TIMER(_profile, "DecodeNullMapTime", orc_profile); @@ -254,9 +260,14 @@ Status OrcReader::init_reader( not_single_slot_filter_conjuncts->end()); } _obj_pool = std::make_shared<ObjectPool>(); - SCOPED_RAW_TIMER(&_statistics.parse_meta_time); - RETURN_IF_ERROR(_create_file_reader()); - RETURN_IF_ERROR(_init_read_columns()); + { + SCOPED_RAW_TIMER(&_statistics.create_reader_time); + RETURN_IF_ERROR(_create_file_reader()); + } + { + SCOPED_RAW_TIMER(&_statistics.init_column_time); + RETURN_IF_ERROR(_init_read_columns()); + } return Status::OK(); } @@ -646,7 +657,7 @@ Status OrcReader::set_fill_columns( const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& partition_columns, const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) { - SCOPED_RAW_TIMER(&_statistics.parse_meta_time); + SCOPED_RAW_TIMER(&_statistics.set_fill_column_time); // std::unordered_map<column_name, std::pair<col_id, slot_id>> std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns; @@ -864,7 +875,12 @@ void OrcReader::_init_bloom_filter( } void OrcReader::_init_system_properties() { - _system_properties.system_type = _scan_params.file_type; + if (_scan_range.__isset.file_type) { + // for compatibility + _system_properties.system_type = _scan_range.file_type; + } else { + _system_properties.system_type = _scan_params.file_type; + } _system_properties.properties = _scan_params.properties; _system_properties.hdfs_params = _scan_params.hdfs_params; if (_scan_params.__isset.broker_addresses) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 85a73e0e21..d05fb4c478 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -126,7 +126,9 @@ public: int64_t fs_read_bytes = 0; int64_t column_read_time = 0; int64_t get_batch_time = 0; - int64_t parse_meta_time = 0; + int64_t create_reader_time = 0; + int64_t init_column_time = 0; + int64_t set_fill_column_time = 0; int64_t decode_value_time = 0; int64_t decode_null_map_time = 0; }; @@ -200,7 +202,9 @@ private: RuntimeProfile::Counter* read_bytes; RuntimeProfile::Counter* column_read_time; RuntimeProfile::Counter* get_batch_time; - RuntimeProfile::Counter* parse_meta_time; + RuntimeProfile::Counter* create_reader_time; + RuntimeProfile::Counter* init_column_time; + RuntimeProfile::Counter* set_fill_column_time; RuntimeProfile::Counter* decode_value_time; RuntimeProfile::Counter* decode_null_map_time; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index fed33b6d28..ed4aa6e6ae 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -269,7 +269,12 @@ Status ParquetReader::open() { } void ParquetReader::_init_system_properties() { - _system_properties.system_type = _scan_params.file_type; + if (_scan_range.__isset.file_type) { + // for compatibility + _system_properties.system_type = _scan_range.file_type; + } else { + _system_properties.system_type = _scan_params.file_type; + } _system_properties.properties = _scan_params.properties; _system_properties.hdfs_params = _scan_params.hdfs_params; if (_scan_params.__isset.broker_addresses) { diff --git a/be/src/vec/exec/scan/avro_jni_reader.cpp b/be/src/vec/exec/scan/avro_jni_reader.cpp index 5d1ef40cbc..ffca8682c4 100644 --- a/be/src/vec/exec/scan/avro_jni_reader.cpp +++ b/be/src/vec/exec/scan/avro_jni_reader.cpp @@ -74,7 +74,13 @@ Status AvroJNIReader::init_fetch_table_reader( index++; } - TFileType::type type = _params.file_type; + TFileType::type type; + if (_range.__isset.file_type) { + // for compatibility + type = _range.file_type; + } else { + type = _params.file_type; + } std::map<String, String> required_param = { {"required_fields", required_fields.str()}, {"columns_types", columns_types.str()}, diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 127539d26e..0b8d318551 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -49,6 +49,12 @@ Status NewFileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status NewFileScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); + if (state->get_query_ctx() != nullptr && + state->get_query_ctx()->file_scan_range_params_map.count(id()) > 0) { + TFileScanRangeParams& params = state->get_query_ctx()->file_scan_range_params_map[id()]; + _input_tuple_id = params.src_tuple_id; + _output_tuple_id = params.dest_tuple_id; + } return Status::OK(); } @@ -74,7 +80,10 @@ void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ _scan_ranges.shrink_to_fit(); LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); } - if (scan_ranges.size() > 0) { + if (scan_ranges.size() > 0 && + scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) { + // for compatibility. + // in new implement, the tuple id is set in prepare phase _input_tuple_id = scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.src_tuple_id; _output_tuple_id = diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index b95d2aa773..38574262de 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -89,7 +89,6 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t const TFileScanRange& scan_range, RuntimeProfile* profile, ShardedKVCache* kv_cache) : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), - _params(scan_range.params), _ranges(scan_range.ranges), _next_range(0), _cur_reader(nullptr), @@ -99,6 +98,14 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t if (scan_range.params.__isset.strict_mode) { _strict_mode = scan_range.params.strict_mode; } + + if (state->get_query_ctx() != nullptr && + state->get_query_ctx()->file_scan_range_params_map.count(parent->id()) > 0) { + _params = &(state->get_query_ctx()->file_scan_range_params_map[parent->id()]); + } else { + CHECK(scan_range.__isset.params); + _params = &(scan_range.params); + } } Status VFileScanner::prepare( @@ -133,13 +140,13 @@ Status VFileScanner::prepare( std::vector<TupleId>({_input_tuple_desc->id()}), std::vector<bool>({false}))); // prepare pre filters - if (_params.__isset.pre_filter_exprs_list) { + if (_params->__isset.pre_filter_exprs_list) { RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_trees( - _params.pre_filter_exprs_list, _pre_conjunct_ctxs)); - } else if (_params.__isset.pre_filter_exprs) { + _params->pre_filter_exprs_list, _pre_conjunct_ctxs)); + } else if (_params->__isset.pre_filter_exprs) { VExprContextSPtr context; RETURN_IF_ERROR( - doris::vectorized::VExpr::create_expr_tree(_params.pre_filter_exprs, context)); + doris::vectorized::VExpr::create_expr_tree(_params->pre_filter_exprs, context)); _pre_conjunct_ctxs.emplace_back(context); } @@ -569,9 +576,10 @@ Status VFileScanner::_get_next_reader() { // create reader for specific format Status init_status; - TFileFormatType::type format_type = _params.format_type; + TFileFormatType::type format_type = _params->format_type; // JNI reader can only push down column value range - bool push_down_predicates = !_is_load && _params.format_type != TFileFormatType::FORMAT_JNI; + bool push_down_predicates = + !_is_load && _params->format_type != TFileFormatType::FORMAT_JNI; if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params && range.table_format_params.table_format_type == "hudi") { if (range.table_format_params.hudi_params.delta_logs.empty()) { @@ -598,9 +606,9 @@ Status VFileScanner::_get_next_reader() { ->init_reader(_colname_to_value_range); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "hudi") { - _cur_reader = - HudiJniReader::create_unique(_params, range.table_format_params.hudi_params, - _file_slot_descs, _state, _profile); + _cur_reader = HudiJniReader::create_unique(*_params, + range.table_format_params.hudi_params, + _file_slot_descs, _state, _profile); init_status = ((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range); } @@ -608,9 +616,11 @@ Status VFileScanner::_get_next_reader() { } case TFileFormatType::FORMAT_PARQUET: { std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique( - _profile, _params, range, _state->query_options().batch_size, + _profile, *_params, range, _state->query_options().batch_size, const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx.get(), _state, - ExecEnv::GetInstance()->file_meta_cache(), + config::max_external_file_meta_cache_num <= 0 + ? nullptr + : ExecEnv::GetInstance()->file_meta_cache(), _state->query_options().enable_parquet_lazy_mat); { SCOPED_TIMER(_open_reader_timer); @@ -627,7 +637,7 @@ Status VFileScanner::_get_next_reader() { range.table_format_params.table_format_type == "iceberg") { std::unique_ptr<IcebergTableReader> iceberg_reader = IcebergTableReader::create_unique(std::move(parquet_reader), _profile, - _state, _params, range, _kv_cache, + _state, *_params, range, _kv_cache, _io_ctx.get()); init_status = iceberg_reader->init_reader( _file_col_names, _col_id_name_map, _colname_to_value_range, @@ -649,7 +659,7 @@ Status VFileScanner::_get_next_reader() { } case TFileFormatType::FORMAT_ORC: { std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique( - _profile, _state, _params, range, _state->query_options().batch_size, + _profile, _state, *_params, range, _state->query_options().batch_size, _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat); if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) { _push_down_conjuncts.resize(_conjuncts.size()); @@ -662,7 +672,7 @@ Status VFileScanner::_get_next_reader() { range.table_format_params.table_format_type == "transactional_hive") { std::unique_ptr<TransactionalHiveReader> tran_orc_reader = TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, - _state, _params, range, + _state, *_params, range, _io_ctx.get()); init_status = tran_orc_reader->init_reader( _file_col_names, _colname_to_value_range, _push_down_conjuncts, @@ -686,13 +696,13 @@ Status VFileScanner::_get_next_reader() { case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_CSV_DEFLATE: case TFileFormatType::FORMAT_PROTO: { - _cur_reader = CsvReader::create_unique(_state, _profile, &_counter, _params, range, + _cur_reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range, _file_slot_descs, _io_ctx.get()); init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load); break; } case TFileFormatType::FORMAT_JSON: { - _cur_reader = NewJsonReader::create_unique(_state, _profile, &_counter, _params, range, + _cur_reader = NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range, _file_slot_descs, &_scanner_eof, _io_ctx.get(), _is_dynamic_schema); init_status = @@ -700,13 +710,14 @@ Status VFileScanner::_get_next_reader() { break; } case TFileFormatType::FORMAT_AVRO: { - _cur_reader = AvroJNIReader::create_unique(_state, _profile, _params, _file_slot_descs); + _cur_reader = + AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs); init_status = ((AvroJNIReader*)(_cur_reader.get())) ->init_fetch_table_reader(_colname_to_value_range); break; } default: - return Status::InternalError("Not supported file format: {}", _params.format_type); + return Status::InternalError("Not supported file format: {}", _params->format_type); } if (init_status.is<END_OF_FILE>()) { @@ -810,8 +821,8 @@ Status VFileScanner::_init_expr_ctxes() { } } - _num_of_columns_from_file = _params.num_of_columns_from_file; - for (const auto& slot_info : _params.required_slots) { + _num_of_columns_from_file = _params->num_of_columns_from_file; + for (const auto& slot_info : _params->required_slots) { auto slot_id = slot_info.slot_id; auto it = full_src_slot_map.find(slot_id); if (it == std::end(full_src_slot_map)) { @@ -843,8 +854,8 @@ Status VFileScanner::_init_expr_ctxes() { continue; } vectorized::VExprContextSPtr ctx; - auto it = _params.default_value_of_src_slot.find(slot_desc->id()); - if (it != std::end(_params.default_value_of_src_slot)) { + auto it = _params->default_value_of_src_slot.find(slot_desc->id()); + if (it != std::end(_params->default_value_of_src_slot)) { if (!it->second.nodes.empty()) { RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx)); RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc)); @@ -857,14 +868,14 @@ Status VFileScanner::_init_expr_ctxes() { if (_is_load) { // follow desc expr map is only for load task. - bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans; + bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans; int idx = 0; for (auto slot_desc : _output_tuple_desc->slots()) { if (!slot_desc->is_materialized()) { continue; } - auto it = _params.expr_of_dest_slot.find(slot_desc->id()); - if (it == std::end(_params.expr_of_dest_slot)) { + auto it = _params->expr_of_dest_slot.find(slot_desc->id()); + if (it == std::end(_params->expr_of_dest_slot)) { return Status::InternalError("No expr for dest slot, id={}, name={}", slot_desc->id(), slot_desc->col_name()); } @@ -879,8 +890,8 @@ Status VFileScanner::_init_expr_ctxes() { _dest_slot_name_to_idx[slot_desc->col_name()] = idx++; if (has_slot_id_map) { - auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); - if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) { + auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id()); + if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) { _src_slot_descs_order_by_dest.emplace_back(nullptr); } else { auto _src_slot_it = full_src_slot_map.find(it1->second); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 0b1d7a558a..9a16d056ed 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -89,7 +89,7 @@ protected: protected: std::unique_ptr<TextConverter> _text_converter; - const TFileScanRangeParams& _params; + const TFileScanRangeParams* _params; const std::vector<TFileRangeDesc>& _ranges; int _next_range; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 09ed6361e5..5b6238bf14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -180,11 +180,6 @@ public abstract class FileQueryScanNode extends FileScanNode { } // Update required slots and column_idxs in scanRangeLocations. setColumnPositionMapping(); - for (TScanRangeLocations location : scanRangeLocations) { - TFileScanRangeParams rangeParams = location.getScanRange().getExtScanRange().getFileScanRange().getParams(); - rangeParams.setRequiredSlots(params.getRequiredSlots()); - rangeParams.setColumnIdxs(params.getColumnIdxs()); - } } @Override @@ -227,6 +222,10 @@ public abstract class FileQueryScanNode extends FileScanNode { params.setColumnIdxs(columnIdxs); } + public TFileScanRangeParams getFileScanRangeParams() { + return params; + } + @Override public void createScanRangeLocations() throws UserException { long start = System.currentTimeMillis(); @@ -237,41 +236,24 @@ public abstract class FileQueryScanNode extends FileScanNode { } TFileFormatType fileFormatType = getFileFormatType(); params.setFormatType(fileFormatType); + boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON; + if (isCsvOrJson) { + params.setFileAttributes(getFileAttributes()); + } + + Map<String, String> locationProperties = getLocationProperties(); + // for JNI, only need to set properties + if (fileFormatType == TFileFormatType.FORMAT_JNI) { + params.setProperties(locationProperties); + } + List<String> pathPartitionKeys = getPathPartitionKeys(); for (Split split : inputSplits) { - TFileScanRangeParams scanRangeParams = new TFileScanRangeParams(params); FileSplit fileSplit = (FileSplit) split; TFileType locationType = getLocationType(fileSplit.getPath().toString()); - scanRangeParams.setFileType(locationType); - TFileCompressType fileCompressType = getFileCompressType(fileSplit); - scanRangeParams.setCompressType(fileCompressType); - boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON; - if (isCsvOrJson) { - scanRangeParams.setFileAttributes(getFileAttributes()); - } - - // set hdfs params for hdfs file type. - Map<String, String> locationProperties = getLocationProperties(); - if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == TFileType.FILE_S3) { - scanRangeParams.setProperties(locationProperties); - } - if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { - String fsName = getFsName(fileSplit); - THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); - tHdfsParams.setFsName(fsName); - scanRangeParams.setHdfsParams(tHdfsParams); - - if (locationType == TFileType.FILE_BROKER) { - FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); - if (broker == null) { - throw new UserException("No alive broker."); - } - scanRangeParams.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); - } - } - - TScanRangeLocations curLocations = newLocations(scanRangeParams); + setLocationPropertiesIfNecessary(locationType, fileSplit, locationProperties); + TScanRangeLocations curLocations = newLocations(); // If fileSplit has partition values, use the values collected from hive partitions. // Otherwise, use the values in file path. boolean isACID = false; @@ -285,6 +267,8 @@ public abstract class FileQueryScanNode extends FileScanNode { TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, locationType); + TFileCompressType fileCompressType = getFileCompressType(fileSplit); + rangeDesc.setCompressType(fileCompressType); if (isACID) { HiveSplit hiveSplit = (HiveSplit) split; hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); @@ -332,11 +316,31 @@ public abstract class FileQueryScanNode extends FileScanNode { scanRangeLocations.size(), (System.currentTimeMillis() - start)); } - private TScanRangeLocations newLocations(TFileScanRangeParams params) { + private void setLocationPropertiesIfNecessary(TFileType locationType, FileSplit fileSplit, + Map<String, String> locationProperties) throws UserException { + if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { + if (!params.isSetHdfsParams()) { + String fsName = getFsName(fileSplit); + THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); + tHdfsParams.setFsName(fsName); + params.setHdfsParams(tHdfsParams); + } + + if (locationType == TFileType.FILE_BROKER && !params.isSetBrokerAddresses()) { + FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + if (broker == null) { + throw new UserException("No alive broker."); + } + params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); + } + } else if (locationType == TFileType.FILE_S3 && !params.isSetProperties()) { + params.setProperties(locationProperties); + } + } + + private TScanRangeLocations newLocations() { // Generate on file scan range TFileScanRange fileScanRange = new TFileScanRange(); - fileScanRange.setParams(params); - // Scan range TExternalScanRange externalScanRange = new TExternalScanRange(); externalScanRange.setFileScanRange(fileScanRange); @@ -361,6 +365,7 @@ public abstract class FileQueryScanNode extends FileScanNode { rangeDesc.setColumnsFromPath(columnsFromPath); rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); + rangeDesc.setFileType(locationType); if (locationType == TFileType.FILE_HDFS) { rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); } else if (locationType == TFileType.FILE_S3 @@ -424,3 +429,4 @@ public abstract class FileQueryScanNode extends FileScanNode { } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 180e3e1c75..a81bd1984b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -61,6 +61,7 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.UnionNode; import org.apache.doris.planner.external.ExternalScanNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.planner.external.FileScanNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; @@ -83,6 +84,7 @@ import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TExternalScanRange; import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPipelineFragmentParams; @@ -165,6 +167,9 @@ public class Coordinator { // copied from TQueryExecRequest; constant across all fragments private final TDescriptorTable descTable; + // scan node id -> TFileScanRangeParams + private Map<Integer, TFileScanRangeParams> fileScanRangeParamsMap = Maps.newHashMap(); + // Why do we use query global? // When `NOW()` function is in sql, we need only one now(), // but, we execute `NOW()` distributed. @@ -1937,6 +1942,11 @@ public class Coordinator { k -> Sets.newHashSet()); scanNodeIds.add(scanNode.getId().asInt()); + if (scanNode instanceof FileQueryScanNode) { + fileScanRangeParamsMap.put( + scanNode.getId().asInt(), ((FileQueryScanNode) scanNode).getFileScanRangeParams()); + } + FragmentScanRangeAssignment assignment = fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment; boolean fragmentContainsColocateJoin = isColocateFragment(scanNode.getFragment(), @@ -2294,7 +2304,7 @@ public class Coordinator { return executionProfile.isAllInstancesDone(); } - // map from an impalad host address to the per-node assigned scan ranges; + // map from a BE host address to the per-node assigned scan ranges; // records scan range assignment for a single fragment class FragmentScanRangeAssignment extends HashMap<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> { @@ -2580,6 +2590,7 @@ public class Coordinator { */ public void unsetFields() { this.rpcParams.unsetDescTbl(); + this.rpcParams.unsetFileScanParams(); this.rpcParams.unsetCoord(); this.rpcParams.unsetQueryGlobals(); this.rpcParams.unsetResourceInfo(); @@ -2731,6 +2742,7 @@ public class Coordinator { */ public void unsetFields() { this.rpcParams.unsetDescTbl(); + this.rpcParams.unsetFileScanParams(); this.rpcParams.unsetCoord(); this.rpcParams.unsetQueryGlobals(); this.rpcParams.unsetResourceInfo(); @@ -3148,6 +3160,8 @@ public class Coordinator { rf.getFilterId().asInt(), rf.toThrift()); } } + + params.setFileScanParams(fileScanRangeParamsMap); paramsList.add(params); } return paramsList; @@ -3186,6 +3200,8 @@ public class Coordinator { if (tWorkloadGroups != null) { params.setWorkloadGroups(tWorkloadGroups); } + + params.setFileScanParams(fileScanRangeParamsMap); res.put(instanceExecParam.host, params); } TPipelineFragmentParams params = res.get(instanceExecParam.host); @@ -3420,3 +3436,4 @@ public class Coordinator { } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 8a28c82ec9..7e1a4b6698 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -442,7 +442,6 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio private PFetchTableSchemaRequest getFetchTableStructureRequest() throws AnalysisException, TException { // set TFileScanRangeParams TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams(); - fileScanRangeParams.setFileType(getTFileType()); fileScanRangeParams.setFormatType(fileFormatType); fileScanRangeParams.setProperties(locationProperties); fileScanRangeParams.setFileAttributes(getFileAttributes()); @@ -466,9 +465,10 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio throw new AnalysisException("Can not get first file, please check uri."); } - fileScanRangeParams.setCompressType(Util.getOrInferCompressType(compressionType, firstFile.getPath())); // set TFileRangeDesc TFileRangeDesc fileRangeDesc = new TFileRangeDesc(); + fileRangeDesc.setFileType(getTFileType()); + fileRangeDesc.setCompressType(Util.getOrInferCompressType(compressionType, firstFile.getPath())); fileRangeDesc.setPath(firstFile.getPath()); fileRangeDesc.setStartOffset(0); fileRangeDesc.setSize(firstFile.getSize()); @@ -483,3 +483,4 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio } } + diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 40457481b7..95dc716b7b 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -420,6 +420,9 @@ struct TExecPlanFragmentParams { 22: optional list<Types.TUniqueId> instances_sharing_hash_table; 23: optional string table_name; + + // scan node id -> scan range params, only for external file scan + 24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> file_scan_params } struct TExecPlanFragmentParamsList { @@ -632,6 +635,8 @@ struct TPipelineFragmentParams { 26: optional list<TPipelineWorkloadGroup> workload_groups 27: optional TTxnParams txn_conf 28: optional string table_name + // scan node id -> scan range params, only for external file scan + 29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> file_scan_params } struct TPipelineFragmentParamsList { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 1b74b0cc70..9dacaac889 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -335,8 +335,10 @@ struct TTableFormatFileDesc { } struct TFileScanRangeParams { + // deprecated, move to TFileScanRange 1: optional Types.TFileType file_type; 2: optional TFileFormatType format_type; + // deprecated, move to TFileScanRange 3: optional TFileCompressType compress_type; // If this is for load job, src point to the source table and dest point to the doris table. // If this is for query, only dest_tuple_id is set, including both file slot and partition slot. @@ -395,6 +397,8 @@ struct TFileRangeDesc { 8: optional TTableFormatFileDesc table_format_params // Use modification time to determine whether the file is changed 9: optional i64 modification_time + 10: optional Types.TFileType file_type; + 11: optional TFileCompressType compress_type; } // TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it. @@ -402,6 +406,10 @@ struct TFileRangeDesc { // list<TFileRangeDesc>: file location and range struct TFileScanRange { 1: optional list<TFileRangeDesc> ranges + // If file_scan_params in TExecPlanFragmentParams is set in TExecPlanFragmentParams + // will use that field, otherwise, use this field. + // file_scan_params in TExecPlanFragmentParams will always be set in query request, + // and TFileScanRangeParams here is used for some other request such as fetch table schema for tvf. 2: optional TFileScanRangeParams params } diff --git a/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy b/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy index 455d0333f4..cd8e1ee501 100644 --- a/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy +++ b/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy @@ -285,7 +285,7 @@ suite("test_external_brown", "p2") { if (enabled != null && enabled.equalsIgnoreCase("true")) { String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") - String catalog_name = "external_yandex" + String catalog_name = "external_brown" sql """drop catalog if exists ${catalog_name};""" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
