This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fc0a84735 [improvement](catalog) reduce the size thrift params for 
external table query (#21771)
5fc0a84735 is described below

commit 5fc0a84735ade1fb9f358538299ccd0aa0b009f1
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Jul 17 13:37:02 2023 +0800

    [improvement](catalog) reduce the size thrift params for external table 
query (#21771)
    
    ### 1
    In previous implementation, for each FileSplit, there will be a 
`TFileScanRange`, and each `TFileScanRange`
    contains a list of `TFileRangeDesc` and a `TFileScanRangeParams`.
    So if there are thousands of FileSplit, there will be thousands of 
`TFileScanRange`, which cause the thrift
    data send to BE too large, resulting in:
    
    1. the rpc of sending fragment may fail due to timeout
    2. FE will OOM
    
    For a certain query request, the `TFileScanRangeParams` is the common part 
and is same of all `TFileScanRange`.
    So I move this to the `TExecPlanFragmentParams`.
    After that, for each FileSplit, there is only a list of `TFileRangeDesc`.
    
    In my test, to query a hive table with 100000 partitions, the size of 
thrift data reduced from 151MB to 15MB,
    and the above 2 issues are gone.
    
    ### 2
    Support when setting `max_external_file_meta_cache_num` <=0, the file meta 
cache for parquet footer will
    not be used.
    Because I found that for some wide table, the footer is too large(1MB after 
compact, and much more after
    deserialized to thrift), it will consuming too much memory of BE when there 
are many files.
    
    This will be optimized later, here I just support to disable this cache.
---
 be/src/olap/push_handler.cpp                       |  7 +-
 be/src/runtime/fragment_mgr.cpp                    |  6 ++
 be/src/runtime/query_context.h                     |  4 ++
 be/src/vec/exec/format/csv/csv_reader.cpp          | 21 +++++-
 be/src/vec/exec/format/json/new_json_reader.cpp    |  7 +-
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 30 ++++++--
 be/src/vec/exec/format/orc/vorc_reader.h           |  8 ++-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  7 +-
 be/src/vec/exec/scan/avro_jni_reader.cpp           |  8 ++-
 be/src/vec/exec/scan/new_file_scan_node.cpp        | 11 ++-
 be/src/vec/exec/scan/vfile_scanner.cpp             | 67 ++++++++++--------
 be/src/vec/exec/scan/vfile_scanner.h               |  2 +-
 .../doris/planner/external/FileQueryScanNode.java  | 82 ++++++++++++----------
 .../main/java/org/apache/doris/qe/Coordinator.java | 19 ++++-
 .../ExternalFileTableValuedFunction.java           |  5 +-
 gensrc/thrift/PaloInternalService.thrift           |  5 ++
 gensrc/thrift/PlanNodes.thrift                     |  8 +++
 .../hive/test_external_brown.groovy                |  2 +-
 18 files changed, 211 insertions(+), 88 deletions(-)

diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 380576fdc1..d852b03814 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -322,7 +322,6 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, 
const TBrokerScanRange&
     if (0 == _ranges.size()) {
         return;
     }
-    _file_params.file_type = _ranges[0].file_type;
     _file_params.format_type = _ranges[0].format_type;
     _file_params.src_tuple_id = _params.src_tuple_id;
     _file_params.dest_tuple_id = _params.dest_tuple_id;
@@ -336,6 +335,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, 
const TBrokerScanRange&
 
     for (int i = 0; i < _ranges.size(); ++i) {
         TFileRangeDesc file_range;
+        // TODO(cmy): in previous implementation, the file_type is set in 
_file_params
+        // and it use _ranges[0].file_type.
+        // Later, this field is moved to TFileRangeDesc, but here we still 
only use _ranges[0]'s
+        // file_type.
+        // Because I don't know if other range has this field, so just keep it 
same as before.
+        file_range.file_type = _ranges[0].file_type;
         file_range.load_id = _ranges[i].load_id;
         file_range.path = _ranges[i].path;
         file_range.start_offset = _ranges[i].start_offset;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 3f19d489d9..5bde9ad19a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -676,6 +676,12 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         query_ctx->query_id = query_id;
         RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
                                               &(query_ctx->desc_tbl)));
+
+        // set file scan range params
+        if (params.__isset.file_scan_params) {
+            query_ctx->file_scan_range_params_map = params.file_scan_params;
+        }
+
         query_ctx->coord_addr = params.coord;
         LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, 
query_ctx->query_id.lo)
                   << " coord_addr " << query_ctx->coord_addr
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 8ed0150db5..c7651a687d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -197,6 +197,10 @@ public:
 
     std::vector<TUniqueId> fragment_ids;
 
+    // plan node id -> TFileScanRangeParams
+    // only for file scan node
+    std::map<int, TFileScanRangeParams> file_scan_range_params_map;
+
 private:
     ExecEnv* _exec_env;
     vectorized::VecDateTimeValue _start_time;
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 09531a2c2b..55a43ed4c5 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -88,7 +88,12 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* 
profile, ScannerCounte
           _io_ctx(io_ctx) {
     _file_format_type = _params.format_type;
     _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
-    _file_compress_type = _params.compress_type;
+    if (_range.__isset.compress_type) {
+        // for compatibility
+        _file_compress_type = _range.compress_type;
+    } else {
+        _file_compress_type = _params.compress_type;
+    }
     _size = _range.size;
 
     _split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
@@ -110,7 +115,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params
           _decompressor(nullptr),
           _io_ctx(io_ctx) {
     _file_format_type = _params.format_type;
-    _file_compress_type = _params.compress_type;
+    if (_range.__isset.compress_type) {
+        // for compatibility
+        _file_compress_type = _range.compress_type;
+    } else {
+        _file_compress_type = _params.compress_type;
+    }
     _size = _range.size;
     _init_system_properties();
     _init_file_description();
@@ -119,7 +129,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params
 CsvReader::~CsvReader() = default;
 
 void CsvReader::_init_system_properties() {
-    _system_properties.system_type = _params.file_type;
+    if (_range.__isset.file_type) {
+        // for compatibility
+        _system_properties.system_type = _range.file_type;
+    } else {
+        _system_properties.system_type = _params.file_type;
+    }
     _system_properties.properties = _params.properties;
     _system_properties.hdfs_params = _params.hdfs_params;
     if (_params.__isset.broker_addresses) {
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 7d17f650c6..94c376ae69 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -135,7 +135,12 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, 
const TFileScanRangeParams
 }
 
 void NewJsonReader::_init_system_properties() {
-    _system_properties.system_type = _params.file_type;
+    if (_range.__isset.file_type) {
+        // for compatibility
+        _system_properties.system_type = _range.file_type;
+    } else {
+        _system_properties.system_type = _params.file_type;
+    }
     _system_properties.properties = _params.properties;
     _system_properties.hdfs_params = _params.hdfs_params;
     if (_params.__isset.broker_addresses) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 7f87f77590..001d32b821 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -181,7 +181,9 @@ void OrcReader::_collect_profile_on_close() {
         COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes);
         COUNTER_UPDATE(_orc_profile.column_read_time, 
_statistics.column_read_time);
         COUNTER_UPDATE(_orc_profile.get_batch_time, 
_statistics.get_batch_time);
-        COUNTER_UPDATE(_orc_profile.parse_meta_time, 
_statistics.parse_meta_time);
+        COUNTER_UPDATE(_orc_profile.create_reader_time, 
_statistics.create_reader_time);
+        COUNTER_UPDATE(_orc_profile.init_column_time, 
_statistics.init_column_time);
+        COUNTER_UPDATE(_orc_profile.set_fill_column_time, 
_statistics.set_fill_column_time);
         COUNTER_UPDATE(_orc_profile.decode_value_time, 
_statistics.decode_value_time);
         COUNTER_UPDATE(_orc_profile.decode_null_map_time, 
_statistics.decode_null_map_time);
     }
@@ -200,7 +202,11 @@ void OrcReader::_init_profile() {
         _orc_profile.read_bytes = ADD_COUNTER(_profile, "FileReadBytes", 
TUnit::BYTES);
         _orc_profile.column_read_time = ADD_CHILD_TIMER(_profile, 
"ColumnReadTime", orc_profile);
         _orc_profile.get_batch_time = ADD_CHILD_TIMER(_profile, 
"GetBatchTime", orc_profile);
-        _orc_profile.parse_meta_time = ADD_CHILD_TIMER(_profile, 
"ParseMetaTime", orc_profile);
+        _orc_profile.create_reader_time =
+                ADD_CHILD_TIMER(_profile, "CreateReaderTime", orc_profile);
+        _orc_profile.init_column_time = ADD_CHILD_TIMER(_profile, 
"InitColumnTime", orc_profile);
+        _orc_profile.set_fill_column_time =
+                ADD_CHILD_TIMER(_profile, "SetFillColumnTime", orc_profile);
         _orc_profile.decode_value_time = ADD_CHILD_TIMER(_profile, 
"DecodeValueTime", orc_profile);
         _orc_profile.decode_null_map_time =
                 ADD_CHILD_TIMER(_profile, "DecodeNullMapTime", orc_profile);
@@ -254,9 +260,14 @@ Status OrcReader::init_reader(
                                  not_single_slot_filter_conjuncts->end());
     }
     _obj_pool = std::make_shared<ObjectPool>();
-    SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
-    RETURN_IF_ERROR(_create_file_reader());
-    RETURN_IF_ERROR(_init_read_columns());
+    {
+        SCOPED_RAW_TIMER(&_statistics.create_reader_time);
+        RETURN_IF_ERROR(_create_file_reader());
+    }
+    {
+        SCOPED_RAW_TIMER(&_statistics.init_column_time);
+        RETURN_IF_ERROR(_init_read_columns());
+    }
     return Status::OK();
 }
 
@@ -646,7 +657,7 @@ Status OrcReader::set_fill_columns(
         const std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>&
                 partition_columns,
         const std::unordered_map<std::string, VExprContextSPtr>& 
missing_columns) {
-    SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
+    SCOPED_RAW_TIMER(&_statistics.set_fill_column_time);
 
     // std::unordered_map<column_name, std::pair<col_id, slot_id>>
     std::unordered_map<std::string, std::pair<uint32_t, int>> 
predicate_columns;
@@ -864,7 +875,12 @@ void OrcReader::_init_bloom_filter(
 }
 
 void OrcReader::_init_system_properties() {
-    _system_properties.system_type = _scan_params.file_type;
+    if (_scan_range.__isset.file_type) {
+        // for compatibility
+        _system_properties.system_type = _scan_range.file_type;
+    } else {
+        _system_properties.system_type = _scan_params.file_type;
+    }
     _system_properties.properties = _scan_params.properties;
     _system_properties.hdfs_params = _scan_params.hdfs_params;
     if (_scan_params.__isset.broker_addresses) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 85a73e0e21..d05fb4c478 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -126,7 +126,9 @@ public:
         int64_t fs_read_bytes = 0;
         int64_t column_read_time = 0;
         int64_t get_batch_time = 0;
-        int64_t parse_meta_time = 0;
+        int64_t create_reader_time = 0;
+        int64_t init_column_time = 0;
+        int64_t set_fill_column_time = 0;
         int64_t decode_value_time = 0;
         int64_t decode_null_map_time = 0;
     };
@@ -200,7 +202,9 @@ private:
         RuntimeProfile::Counter* read_bytes;
         RuntimeProfile::Counter* column_read_time;
         RuntimeProfile::Counter* get_batch_time;
-        RuntimeProfile::Counter* parse_meta_time;
+        RuntimeProfile::Counter* create_reader_time;
+        RuntimeProfile::Counter* init_column_time;
+        RuntimeProfile::Counter* set_fill_column_time;
         RuntimeProfile::Counter* decode_value_time;
         RuntimeProfile::Counter* decode_null_map_time;
     };
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index fed33b6d28..ed4aa6e6ae 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -269,7 +269,12 @@ Status ParquetReader::open() {
 }
 
 void ParquetReader::_init_system_properties() {
-    _system_properties.system_type = _scan_params.file_type;
+    if (_scan_range.__isset.file_type) {
+        // for compatibility
+        _system_properties.system_type = _scan_range.file_type;
+    } else {
+        _system_properties.system_type = _scan_params.file_type;
+    }
     _system_properties.properties = _scan_params.properties;
     _system_properties.hdfs_params = _scan_params.hdfs_params;
     if (_scan_params.__isset.broker_addresses) {
diff --git a/be/src/vec/exec/scan/avro_jni_reader.cpp 
b/be/src/vec/exec/scan/avro_jni_reader.cpp
index 5d1ef40cbc..ffca8682c4 100644
--- a/be/src/vec/exec/scan/avro_jni_reader.cpp
+++ b/be/src/vec/exec/scan/avro_jni_reader.cpp
@@ -74,7 +74,13 @@ Status AvroJNIReader::init_fetch_table_reader(
         index++;
     }
 
-    TFileType::type type = _params.file_type;
+    TFileType::type type;
+    if (_range.__isset.file_type) {
+        // for compatibility
+        type = _range.file_type;
+    } else {
+        type = _params.file_type;
+    }
     std::map<String, String> required_param = {
             {"required_fields", required_fields.str()},
             {"columns_types", columns_types.str()},
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp 
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 127539d26e..0b8d318551 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -49,6 +49,12 @@ Status NewFileScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 
 Status NewFileScanNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(VScanNode::prepare(state));
+    if (state->get_query_ctx() != nullptr &&
+        state->get_query_ctx()->file_scan_range_params_map.count(id()) > 0) {
+        TFileScanRangeParams& params = 
state->get_query_ctx()->file_scan_range_params_map[id()];
+        _input_tuple_id = params.src_tuple_id;
+        _output_tuple_id = params.dest_tuple_id;
+    }
     return Status::OK();
 }
 
@@ -74,7 +80,10 @@ void NewFileScanNode::set_scan_ranges(const 
std::vector<TScanRangeParams>& scan_
         _scan_ranges.shrink_to_fit();
         LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << 
_scan_ranges.size();
     }
-    if (scan_ranges.size() > 0) {
+    if (scan_ranges.size() > 0 &&
+        
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
+        // for compatibility.
+        // in new implement, the tuple id is set in prepare phase
         _input_tuple_id =
                 
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.src_tuple_id;
         _output_tuple_id =
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index b95d2aa773..38574262de 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -89,7 +89,6 @@ VFileScanner::VFileScanner(RuntimeState* state, 
NewFileScanNode* parent, int64_t
                            const TFileScanRange& scan_range, RuntimeProfile* 
profile,
                            ShardedKVCache* kv_cache)
         : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
-          _params(scan_range.params),
           _ranges(scan_range.ranges),
           _next_range(0),
           _cur_reader(nullptr),
@@ -99,6 +98,14 @@ VFileScanner::VFileScanner(RuntimeState* state, 
NewFileScanNode* parent, int64_t
     if (scan_range.params.__isset.strict_mode) {
         _strict_mode = scan_range.params.strict_mode;
     }
+
+    if (state->get_query_ctx() != nullptr &&
+        state->get_query_ctx()->file_scan_range_params_map.count(parent->id()) 
> 0) {
+        _params = 
&(state->get_query_ctx()->file_scan_range_params_map[parent->id()]);
+    } else {
+        CHECK(scan_range.__isset.params);
+        _params = &(scan_range.params);
+    }
 }
 
 Status VFileScanner::prepare(
@@ -133,13 +140,13 @@ Status VFileScanner::prepare(
                                               
std::vector<TupleId>({_input_tuple_desc->id()}),
                                               std::vector<bool>({false})));
         // prepare pre filters
-        if (_params.__isset.pre_filter_exprs_list) {
+        if (_params->__isset.pre_filter_exprs_list) {
             RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_trees(
-                    _params.pre_filter_exprs_list, _pre_conjunct_ctxs));
-        } else if (_params.__isset.pre_filter_exprs) {
+                    _params->pre_filter_exprs_list, _pre_conjunct_ctxs));
+        } else if (_params->__isset.pre_filter_exprs) {
             VExprContextSPtr context;
             RETURN_IF_ERROR(
-                    
doris::vectorized::VExpr::create_expr_tree(_params.pre_filter_exprs, context));
+                    
doris::vectorized::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
             _pre_conjunct_ctxs.emplace_back(context);
         }
 
@@ -569,9 +576,10 @@ Status VFileScanner::_get_next_reader() {
 
         // create reader for specific format
         Status init_status;
-        TFileFormatType::type format_type = _params.format_type;
+        TFileFormatType::type format_type = _params->format_type;
         // JNI reader can only push down column value range
-        bool push_down_predicates = !_is_load && _params.format_type != 
TFileFormatType::FORMAT_JNI;
+        bool push_down_predicates =
+                !_is_load && _params->format_type != 
TFileFormatType::FORMAT_JNI;
         if (format_type == TFileFormatType::FORMAT_JNI && 
range.__isset.table_format_params &&
             range.table_format_params.table_format_type == "hudi") {
             if (range.table_format_params.hudi_params.delta_logs.empty()) {
@@ -598,9 +606,9 @@ Status VFileScanner::_get_next_reader() {
                                       ->init_reader(_colname_to_value_range);
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == "hudi") {
-                _cur_reader =
-                        HudiJniReader::create_unique(_params, 
range.table_format_params.hudi_params,
-                                                     _file_slot_descs, _state, 
_profile);
+                _cur_reader = HudiJniReader::create_unique(*_params,
+                                                           
range.table_format_params.hudi_params,
+                                                           _file_slot_descs, 
_state, _profile);
                 init_status =
                         
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
             }
@@ -608,9 +616,11 @@ Status VFileScanner::_get_next_reader() {
         }
         case TFileFormatType::FORMAT_PARQUET: {
             std::unique_ptr<ParquetReader> parquet_reader = 
ParquetReader::create_unique(
-                    _profile, _params, range, 
_state->query_options().batch_size,
+                    _profile, *_params, range, 
_state->query_options().batch_size,
                     const_cast<cctz::time_zone*>(&_state->timezone_obj()), 
_io_ctx.get(), _state,
-                    ExecEnv::GetInstance()->file_meta_cache(),
+                    config::max_external_file_meta_cache_num <= 0
+                            ? nullptr
+                            : ExecEnv::GetInstance()->file_meta_cache(),
                     _state->query_options().enable_parquet_lazy_mat);
             {
                 SCOPED_TIMER(_open_reader_timer);
@@ -627,7 +637,7 @@ Status VFileScanner::_get_next_reader() {
                 range.table_format_params.table_format_type == "iceberg") {
                 std::unique_ptr<IcebergTableReader> iceberg_reader =
                         
IcebergTableReader::create_unique(std::move(parquet_reader), _profile,
-                                                          _state, _params, 
range, _kv_cache,
+                                                          _state, *_params, 
range, _kv_cache,
                                                           _io_ctx.get());
                 init_status = iceberg_reader->init_reader(
                         _file_col_names, _col_id_name_map, 
_colname_to_value_range,
@@ -649,7 +659,7 @@ Status VFileScanner::_get_next_reader() {
         }
         case TFileFormatType::FORMAT_ORC: {
             std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
-                    _profile, _state, _params, range, 
_state->query_options().batch_size,
+                    _profile, _state, *_params, range, 
_state->query_options().batch_size,
                     _state->timezone(), _io_ctx.get(), 
_state->query_options().enable_orc_lazy_mat);
             if (push_down_predicates && _push_down_conjuncts.empty() && 
!_conjuncts.empty()) {
                 _push_down_conjuncts.resize(_conjuncts.size());
@@ -662,7 +672,7 @@ Status VFileScanner::_get_next_reader() {
                 range.table_format_params.table_format_type == 
"transactional_hive") {
                 std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
                         
TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
-                                                               _state, 
_params, range,
+                                                               _state, 
*_params, range,
                                                                _io_ctx.get());
                 init_status = tran_orc_reader->init_reader(
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
@@ -686,13 +696,13 @@ Status VFileScanner::_get_next_reader() {
         case TFileFormatType::FORMAT_CSV_LZOP:
         case TFileFormatType::FORMAT_CSV_DEFLATE:
         case TFileFormatType::FORMAT_PROTO: {
-            _cur_reader = CsvReader::create_unique(_state, _profile, 
&_counter, _params, range,
+            _cur_reader = CsvReader::create_unique(_state, _profile, 
&_counter, *_params, range,
                                                    _file_slot_descs, 
_io_ctx.get());
             init_status = 
((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
             break;
         }
         case TFileFormatType::FORMAT_JSON: {
-            _cur_reader = NewJsonReader::create_unique(_state, _profile, 
&_counter, _params, range,
+            _cur_reader = NewJsonReader::create_unique(_state, _profile, 
&_counter, *_params, range,
                                                        _file_slot_descs, 
&_scanner_eof,
                                                        _io_ctx.get(), 
_is_dynamic_schema);
             init_status =
@@ -700,13 +710,14 @@ Status VFileScanner::_get_next_reader() {
             break;
         }
         case TFileFormatType::FORMAT_AVRO: {
-            _cur_reader = AvroJNIReader::create_unique(_state, _profile, 
_params, _file_slot_descs);
+            _cur_reader =
+                    AvroJNIReader::create_unique(_state, _profile, *_params, 
_file_slot_descs);
             init_status = ((AvroJNIReader*)(_cur_reader.get()))
                                   
->init_fetch_table_reader(_colname_to_value_range);
             break;
         }
         default:
-            return Status::InternalError("Not supported file format: {}", 
_params.format_type);
+            return Status::InternalError("Not supported file format: {}", 
_params->format_type);
         }
 
         if (init_status.is<END_OF_FILE>()) {
@@ -810,8 +821,8 @@ Status VFileScanner::_init_expr_ctxes() {
         }
     }
 
-    _num_of_columns_from_file = _params.num_of_columns_from_file;
-    for (const auto& slot_info : _params.required_slots) {
+    _num_of_columns_from_file = _params->num_of_columns_from_file;
+    for (const auto& slot_info : _params->required_slots) {
         auto slot_id = slot_info.slot_id;
         auto it = full_src_slot_map.find(slot_id);
         if (it == std::end(full_src_slot_map)) {
@@ -843,8 +854,8 @@ Status VFileScanner::_init_expr_ctxes() {
             continue;
         }
         vectorized::VExprContextSPtr ctx;
-        auto it = _params.default_value_of_src_slot.find(slot_desc->id());
-        if (it != std::end(_params.default_value_of_src_slot)) {
+        auto it = _params->default_value_of_src_slot.find(slot_desc->id());
+        if (it != std::end(_params->default_value_of_src_slot)) {
             if (!it->second.nodes.empty()) {
                 
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx));
                 RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
@@ -857,14 +868,14 @@ Status VFileScanner::_init_expr_ctxes() {
 
     if (_is_load) {
         // follow desc expr map is only for load task.
-        bool has_slot_id_map = 
_params.__isset.dest_sid_to_src_sid_without_trans;
+        bool has_slot_id_map = 
_params->__isset.dest_sid_to_src_sid_without_trans;
         int idx = 0;
         for (auto slot_desc : _output_tuple_desc->slots()) {
             if (!slot_desc->is_materialized()) {
                 continue;
             }
-            auto it = _params.expr_of_dest_slot.find(slot_desc->id());
-            if (it == std::end(_params.expr_of_dest_slot)) {
+            auto it = _params->expr_of_dest_slot.find(slot_desc->id());
+            if (it == std::end(_params->expr_of_dest_slot)) {
                 return Status::InternalError("No expr for dest slot, id={}, 
name={}",
                                              slot_desc->id(), 
slot_desc->col_name());
             }
@@ -879,8 +890,8 @@ Status VFileScanner::_init_expr_ctxes() {
             _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
 
             if (has_slot_id_map) {
-                auto it1 = 
_params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
-                if (it1 == 
std::end(_params.dest_sid_to_src_sid_without_trans)) {
+                auto it1 = 
_params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
+                if (it1 == 
std::end(_params->dest_sid_to_src_sid_without_trans)) {
                     _src_slot_descs_order_by_dest.emplace_back(nullptr);
                 } else {
                     auto _src_slot_it = full_src_slot_map.find(it1->second);
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 0b1d7a558a..9a16d056ed 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -89,7 +89,7 @@ protected:
 
 protected:
     std::unique_ptr<TextConverter> _text_converter;
-    const TFileScanRangeParams& _params;
+    const TFileScanRangeParams* _params;
     const std::vector<TFileRangeDesc>& _ranges;
     int _next_range;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 09ed6361e5..5b6238bf14 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -180,11 +180,6 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         }
         // Update required slots and column_idxs in scanRangeLocations.
         setColumnPositionMapping();
-        for (TScanRangeLocations location : scanRangeLocations) {
-            TFileScanRangeParams rangeParams = 
location.getScanRange().getExtScanRange().getFileScanRange().getParams();
-            rangeParams.setRequiredSlots(params.getRequiredSlots());
-            rangeParams.setColumnIdxs(params.getColumnIdxs());
-        }
     }
 
     @Override
@@ -227,6 +222,10 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         params.setColumnIdxs(columnIdxs);
     }
 
+    public TFileScanRangeParams getFileScanRangeParams() {
+        return params;
+    }
+
     @Override
     public void createScanRangeLocations() throws UserException {
         long start = System.currentTimeMillis();
@@ -237,41 +236,24 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         }
         TFileFormatType fileFormatType = getFileFormatType();
         params.setFormatType(fileFormatType);
+        boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || 
fileFormatType == TFileFormatType.FORMAT_JSON;
+        if (isCsvOrJson) {
+            params.setFileAttributes(getFileAttributes());
+        }
+
+        Map<String, String> locationProperties = getLocationProperties();
+        // for JNI, only need to set properties
+        if (fileFormatType == TFileFormatType.FORMAT_JNI) {
+            params.setProperties(locationProperties);
+        }
+
         List<String> pathPartitionKeys = getPathPartitionKeys();
         for (Split split : inputSplits) {
-            TFileScanRangeParams scanRangeParams = new 
TFileScanRangeParams(params);
             FileSplit fileSplit = (FileSplit) split;
             TFileType locationType = 
getLocationType(fileSplit.getPath().toString());
-            scanRangeParams.setFileType(locationType);
-            TFileCompressType fileCompressType = 
getFileCompressType(fileSplit);
-            scanRangeParams.setCompressType(fileCompressType);
-            boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || 
fileFormatType == TFileFormatType.FORMAT_JSON;
-            if (isCsvOrJson) {
-                scanRangeParams.setFileAttributes(getFileAttributes());
-            }
-
-            // set hdfs params for hdfs file type.
-            Map<String, String> locationProperties = getLocationProperties();
-            if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType 
== TFileType.FILE_S3) {
-                scanRangeParams.setProperties(locationProperties);
-            }
-            if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
-                String fsName = getFsName(fileSplit);
-                THdfsParams tHdfsParams = 
HdfsResource.generateHdfsParam(locationProperties);
-                tHdfsParams.setFsName(fsName);
-                scanRangeParams.setHdfsParams(tHdfsParams);
-
-                if (locationType == TFileType.FILE_BROKER) {
-                    FsBroker broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
-                    if (broker == null) {
-                        throw new UserException("No alive broker.");
-                    }
-                    scanRangeParams.addToBrokerAddresses(new 
TNetworkAddress(broker.host, broker.port));
-                }
-            }
-
-            TScanRangeLocations curLocations = newLocations(scanRangeParams);
+            setLocationPropertiesIfNecessary(locationType, fileSplit, 
locationProperties);
 
+            TScanRangeLocations curLocations = newLocations();
             // If fileSplit has partition values, use the values collected 
from hive partitions.
             // Otherwise, use the values in file path.
             boolean isACID = false;
@@ -285,6 +267,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
 
             TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys,
                     locationType);
+            TFileCompressType fileCompressType = 
getFileCompressType(fileSplit);
+            rangeDesc.setCompressType(fileCompressType);
             if (isACID) {
                 HiveSplit hiveSplit = (HiveSplit) split;
                 
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
@@ -332,11 +316,31 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 scanRangeLocations.size(), (System.currentTimeMillis() - 
start));
     }
 
-    private TScanRangeLocations newLocations(TFileScanRangeParams params) {
+    private void setLocationPropertiesIfNecessary(TFileType locationType, 
FileSplit fileSplit,
+            Map<String, String> locationProperties) throws UserException {
+        if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
+            if (!params.isSetHdfsParams()) {
+                String fsName = getFsName(fileSplit);
+                THdfsParams tHdfsParams = 
HdfsResource.generateHdfsParam(locationProperties);
+                tHdfsParams.setFsName(fsName);
+                params.setHdfsParams(tHdfsParams);
+            }
+
+            if (locationType == TFileType.FILE_BROKER && 
!params.isSetBrokerAddresses()) {
+                FsBroker broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+                if (broker == null) {
+                    throw new UserException("No alive broker.");
+                }
+                params.addToBrokerAddresses(new TNetworkAddress(broker.host, 
broker.port));
+            }
+        } else if (locationType == TFileType.FILE_S3 && 
!params.isSetProperties()) {
+            params.setProperties(locationProperties);
+        }
+    }
+
+    private TScanRangeLocations newLocations() {
         // Generate on file scan range
         TFileScanRange fileScanRange = new TFileScanRange();
-        fileScanRange.setParams(params);
-
         // Scan range
         TExternalScanRange externalScanRange = new TExternalScanRange();
         externalScanRange.setFileScanRange(fileScanRange);
@@ -361,6 +365,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         rangeDesc.setColumnsFromPath(columnsFromPath);
         rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
 
+        rangeDesc.setFileType(locationType);
         if (locationType == TFileType.FILE_HDFS) {
             rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
         } else if (locationType == TFileType.FILE_S3
@@ -424,3 +429,4 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
     }
 }
 
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 180e3e1c75..a81bd1984b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -61,6 +61,7 @@ import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.planner.external.ExternalScanNode;
+import org.apache.doris.planner.external.FileQueryScanNode;
 import org.apache.doris.planner.external.FileScanNode;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
@@ -83,6 +84,7 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TExternalScanRange;
 import org.apache.doris.thrift.TFileScanRange;
+import org.apache.doris.thrift.TFileScanRangeParams;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloScanRange;
 import org.apache.doris.thrift.TPipelineFragmentParams;
@@ -165,6 +167,9 @@ public class Coordinator {
     // copied from TQueryExecRequest; constant across all fragments
     private final TDescriptorTable descTable;
 
+    // scan node id -> TFileScanRangeParams
+    private Map<Integer, TFileScanRangeParams> fileScanRangeParamsMap = 
Maps.newHashMap();
+
     // Why do we use query global?
     // When `NOW()` function is in sql, we need only one now(),
     // but, we execute `NOW()` distributed.
@@ -1937,6 +1942,11 @@ public class Coordinator {
                     k -> Sets.newHashSet());
             scanNodeIds.add(scanNode.getId().asInt());
 
+            if (scanNode instanceof FileQueryScanNode) {
+                fileScanRangeParamsMap.put(
+                        scanNode.getId().asInt(), ((FileQueryScanNode) 
scanNode).getFileScanRangeParams());
+            }
+
             FragmentScanRangeAssignment assignment
                     = 
fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
             boolean fragmentContainsColocateJoin = 
isColocateFragment(scanNode.getFragment(),
@@ -2294,7 +2304,7 @@ public class Coordinator {
         return executionProfile.isAllInstancesDone();
     }
 
-    // map from an impalad host address to the per-node assigned scan ranges;
+    // map from a BE host address to the per-node assigned scan ranges;
     // records scan range assignment for a single fragment
     class FragmentScanRangeAssignment
             extends HashMap<TNetworkAddress, Map<Integer, 
List<TScanRangeParams>>> {
@@ -2580,6 +2590,7 @@ public class Coordinator {
          */
         public void unsetFields() {
             this.rpcParams.unsetDescTbl();
+            this.rpcParams.unsetFileScanParams();
             this.rpcParams.unsetCoord();
             this.rpcParams.unsetQueryGlobals();
             this.rpcParams.unsetResourceInfo();
@@ -2731,6 +2742,7 @@ public class Coordinator {
          */
         public void unsetFields() {
             this.rpcParams.unsetDescTbl();
+            this.rpcParams.unsetFileScanParams();
             this.rpcParams.unsetCoord();
             this.rpcParams.unsetQueryGlobals();
             this.rpcParams.unsetResourceInfo();
@@ -3148,6 +3160,8 @@ public class Coordinator {
                                 rf.getFilterId().asInt(), rf.toThrift());
                     }
                 }
+
+                params.setFileScanParams(fileScanRangeParamsMap);
                 paramsList.add(params);
             }
             return paramsList;
@@ -3186,6 +3200,8 @@ public class Coordinator {
                     if (tWorkloadGroups != null) {
                         params.setWorkloadGroups(tWorkloadGroups);
                     }
+
+                    params.setFileScanParams(fileScanRangeParamsMap);
                     res.put(instanceExecParam.host, params);
                 }
                 TPipelineFragmentParams params = 
res.get(instanceExecParam.host);
@@ -3420,3 +3436,4 @@ public class Coordinator {
 }
 
 
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index ddc28de961..49961afcbc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -467,7 +467,6 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
     private PFetchTableSchemaRequest getFetchTableStructureRequest() throws 
AnalysisException, TException {
         // set TFileScanRangeParams
         TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams();
-        fileScanRangeParams.setFileType(getTFileType());
         fileScanRangeParams.setFormatType(fileFormatType);
         fileScanRangeParams.setProperties(locationProperties);
         fileScanRangeParams.setFileAttributes(getFileAttributes());
@@ -491,9 +490,10 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
             throw new AnalysisException("Can not get first file, please check 
uri.");
         }
 
-        
fileScanRangeParams.setCompressType(Util.getOrInferCompressType(compressionType,
 firstFile.getPath()));
         // set TFileRangeDesc
         TFileRangeDesc fileRangeDesc = new TFileRangeDesc();
+        fileRangeDesc.setFileType(getTFileType());
+        
fileRangeDesc.setCompressType(Util.getOrInferCompressType(compressionType, 
firstFile.getPath()));
         fileRangeDesc.setPath(firstFile.getPath());
         fileRangeDesc.setStartOffset(0);
         fileRangeDesc.setSize(firstFile.getSize());
@@ -508,3 +508,4 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
     }
 }
 
+
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 40457481b7..95dc716b7b 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -420,6 +420,9 @@ struct TExecPlanFragmentParams {
 
   22: optional list<Types.TUniqueId> instances_sharing_hash_table;
   23: optional string table_name;
+
+  // scan node id -> scan range params, only for external file scan
+  24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> 
file_scan_params
 }
 
 struct TExecPlanFragmentParamsList {
@@ -632,6 +635,8 @@ struct TPipelineFragmentParams {
   26: optional list<TPipelineWorkloadGroup> workload_groups
   27: optional TTxnParams txn_conf
   28: optional string table_name
+  // scan node id -> scan range params, only for external file scan
+  29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> 
file_scan_params
 }
 
 struct TPipelineFragmentParamsList {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1b74b0cc70..9dacaac889 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -335,8 +335,10 @@ struct TTableFormatFileDesc {
 }
 
 struct TFileScanRangeParams {
+    // deprecated, move to TFileScanRange
     1: optional Types.TFileType file_type;
     2: optional TFileFormatType format_type;
+    // deprecated, move to TFileScanRange
     3: optional TFileCompressType compress_type;
     // If this is for load job, src point to the source table and dest point 
to the doris table.
     // If this is for query, only dest_tuple_id is set, including both file 
slot and partition slot.
@@ -395,6 +397,8 @@ struct TFileRangeDesc {
     8: optional TTableFormatFileDesc table_format_params
     // Use modification time to determine whether the file is changed
     9: optional i64 modification_time
+    10: optional Types.TFileType file_type;
+    11: optional TFileCompressType compress_type;
 }
 
 // TFileScanRange represents a set of descriptions of a file and the rules for 
reading and converting it.
@@ -402,6 +406,10 @@ struct TFileRangeDesc {
 //  list<TFileRangeDesc>: file location and range
 struct TFileScanRange {
     1: optional list<TFileRangeDesc> ranges
+    // If file_scan_params in TExecPlanFragmentParams is set in 
TExecPlanFragmentParams
+    // will use that field, otherwise, use this field.
+    // file_scan_params in TExecPlanFragmentParams will always be set in query 
request,
+    // and TFileScanRangeParams here is used for some other request such as 
fetch table schema for tvf. 
     2: optional TFileScanRangeParams params
 }
 
diff --git 
a/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy 
b/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy
index 455d0333f4..cd8e1ee501 100644
--- 
a/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy
+++ 
b/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy
@@ -285,7 +285,7 @@ suite("test_external_brown", "p2") {
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String extHiveHmsHost = 
context.config.otherConfigs.get("extHiveHmsHost")
         String extHiveHmsPort = 
context.config.otherConfigs.get("extHiveHmsPort")
-        String catalog_name = "external_yandex"
+        String catalog_name = "external_brown"
 
         sql """drop catalog if exists ${catalog_name};"""
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to