This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5fc0a84735 [improvement](catalog) reduce the size thrift params for
external table query (#21771)
5fc0a84735 is described below
commit 5fc0a84735ade1fb9f358538299ccd0aa0b009f1
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Jul 17 13:37:02 2023 +0800
[improvement](catalog) reduce the size thrift params for external table
query (#21771)
### 1
In previous implementation, for each FileSplit, there will be a
`TFileScanRange`, and each `TFileScanRange`
contains a list of `TFileRangeDesc` and a `TFileScanRangeParams`.
So if there are thousands of FileSplit, there will be thousands of
`TFileScanRange`, which cause the thrift
data send to BE too large, resulting in:
1. the rpc of sending fragment may fail due to timeout
2. FE will OOM
For a certain query request, the `TFileScanRangeParams` is the common part
and is same of all `TFileScanRange`.
So I move this to the `TExecPlanFragmentParams`.
After that, for each FileSplit, there is only a list of `TFileRangeDesc`.
In my test, to query a hive table with 100000 partitions, the size of
thrift data reduced from 151MB to 15MB,
and the above 2 issues are gone.
### 2
Support when setting `max_external_file_meta_cache_num` <=0, the file meta
cache for parquet footer will
not be used.
Because I found that for some wide table, the footer is too large(1MB after
compact, and much more after
deserialized to thrift), it will consuming too much memory of BE when there
are many files.
This will be optimized later, here I just support to disable this cache.
---
be/src/olap/push_handler.cpp | 7 +-
be/src/runtime/fragment_mgr.cpp | 6 ++
be/src/runtime/query_context.h | 4 ++
be/src/vec/exec/format/csv/csv_reader.cpp | 21 +++++-
be/src/vec/exec/format/json/new_json_reader.cpp | 7 +-
be/src/vec/exec/format/orc/vorc_reader.cpp | 30 ++++++--
be/src/vec/exec/format/orc/vorc_reader.h | 8 ++-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 7 +-
be/src/vec/exec/scan/avro_jni_reader.cpp | 8 ++-
be/src/vec/exec/scan/new_file_scan_node.cpp | 11 ++-
be/src/vec/exec/scan/vfile_scanner.cpp | 67 ++++++++++--------
be/src/vec/exec/scan/vfile_scanner.h | 2 +-
.../doris/planner/external/FileQueryScanNode.java | 82 ++++++++++++----------
.../main/java/org/apache/doris/qe/Coordinator.java | 19 ++++-
.../ExternalFileTableValuedFunction.java | 5 +-
gensrc/thrift/PaloInternalService.thrift | 5 ++
gensrc/thrift/PlanNodes.thrift | 8 +++
.../hive/test_external_brown.groovy | 2 +-
18 files changed, 211 insertions(+), 88 deletions(-)
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 380576fdc1..d852b03814 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -322,7 +322,6 @@ PushBrokerReader::PushBrokerReader(const Schema* schema,
const TBrokerScanRange&
if (0 == _ranges.size()) {
return;
}
- _file_params.file_type = _ranges[0].file_type;
_file_params.format_type = _ranges[0].format_type;
_file_params.src_tuple_id = _params.src_tuple_id;
_file_params.dest_tuple_id = _params.dest_tuple_id;
@@ -336,6 +335,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema,
const TBrokerScanRange&
for (int i = 0; i < _ranges.size(); ++i) {
TFileRangeDesc file_range;
+ // TODO(cmy): in previous implementation, the file_type is set in
_file_params
+ // and it use _ranges[0].file_type.
+ // Later, this field is moved to TFileRangeDesc, but here we still
only use _ranges[0]'s
+ // file_type.
+ // Because I don't know if other range has this field, so just keep it
same as before.
+ file_range.file_type = _ranges[0].file_type;
file_range.load_id = _ranges[i].load_id;
file_range.path = _ranges[i].path;
file_range.start_offset = _ranges[i].start_offset;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 3f19d489d9..5bde9ad19a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -676,6 +676,12 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
query_ctx->query_id = query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool),
params.desc_tbl,
&(query_ctx->desc_tbl)));
+
+ // set file scan range params
+ if (params.__isset.file_scan_params) {
+ query_ctx->file_scan_range_params_map = params.file_scan_params;
+ }
+
query_ctx->coord_addr = params.coord;
LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi,
query_ctx->query_id.lo)
<< " coord_addr " << query_ctx->coord_addr
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 8ed0150db5..c7651a687d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -197,6 +197,10 @@ public:
std::vector<TUniqueId> fragment_ids;
+ // plan node id -> TFileScanRangeParams
+ // only for file scan node
+ std::map<int, TFileScanRangeParams> file_scan_range_params_map;
+
private:
ExecEnv* _exec_env;
vectorized::VecDateTimeValue _start_time;
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 09531a2c2b..55a43ed4c5 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -88,7 +88,12 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile*
profile, ScannerCounte
_io_ctx(io_ctx) {
_file_format_type = _params.format_type;
_is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
- _file_compress_type = _params.compress_type;
+ if (_range.__isset.compress_type) {
+ // for compatibility
+ _file_compress_type = _range.compress_type;
+ } else {
+ _file_compress_type = _params.compress_type;
+ }
_size = _range.size;
_split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
@@ -110,7 +115,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const
TFileScanRangeParams& params
_decompressor(nullptr),
_io_ctx(io_ctx) {
_file_format_type = _params.format_type;
- _file_compress_type = _params.compress_type;
+ if (_range.__isset.compress_type) {
+ // for compatibility
+ _file_compress_type = _range.compress_type;
+ } else {
+ _file_compress_type = _params.compress_type;
+ }
_size = _range.size;
_init_system_properties();
_init_file_description();
@@ -119,7 +129,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const
TFileScanRangeParams& params
CsvReader::~CsvReader() = default;
void CsvReader::_init_system_properties() {
- _system_properties.system_type = _params.file_type;
+ if (_range.__isset.file_type) {
+ // for compatibility
+ _system_properties.system_type = _range.file_type;
+ } else {
+ _system_properties.system_type = _params.file_type;
+ }
_system_properties.properties = _params.properties;
_system_properties.hdfs_params = _params.hdfs_params;
if (_params.__isset.broker_addresses) {
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 7d17f650c6..94c376ae69 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -135,7 +135,12 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile,
const TFileScanRangeParams
}
void NewJsonReader::_init_system_properties() {
- _system_properties.system_type = _params.file_type;
+ if (_range.__isset.file_type) {
+ // for compatibility
+ _system_properties.system_type = _range.file_type;
+ } else {
+ _system_properties.system_type = _params.file_type;
+ }
_system_properties.properties = _params.properties;
_system_properties.hdfs_params = _params.hdfs_params;
if (_params.__isset.broker_addresses) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 7f87f77590..001d32b821 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -181,7 +181,9 @@ void OrcReader::_collect_profile_on_close() {
COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes);
COUNTER_UPDATE(_orc_profile.column_read_time,
_statistics.column_read_time);
COUNTER_UPDATE(_orc_profile.get_batch_time,
_statistics.get_batch_time);
- COUNTER_UPDATE(_orc_profile.parse_meta_time,
_statistics.parse_meta_time);
+ COUNTER_UPDATE(_orc_profile.create_reader_time,
_statistics.create_reader_time);
+ COUNTER_UPDATE(_orc_profile.init_column_time,
_statistics.init_column_time);
+ COUNTER_UPDATE(_orc_profile.set_fill_column_time,
_statistics.set_fill_column_time);
COUNTER_UPDATE(_orc_profile.decode_value_time,
_statistics.decode_value_time);
COUNTER_UPDATE(_orc_profile.decode_null_map_time,
_statistics.decode_null_map_time);
}
@@ -200,7 +202,11 @@ void OrcReader::_init_profile() {
_orc_profile.read_bytes = ADD_COUNTER(_profile, "FileReadBytes",
TUnit::BYTES);
_orc_profile.column_read_time = ADD_CHILD_TIMER(_profile,
"ColumnReadTime", orc_profile);
_orc_profile.get_batch_time = ADD_CHILD_TIMER(_profile,
"GetBatchTime", orc_profile);
- _orc_profile.parse_meta_time = ADD_CHILD_TIMER(_profile,
"ParseMetaTime", orc_profile);
+ _orc_profile.create_reader_time =
+ ADD_CHILD_TIMER(_profile, "CreateReaderTime", orc_profile);
+ _orc_profile.init_column_time = ADD_CHILD_TIMER(_profile,
"InitColumnTime", orc_profile);
+ _orc_profile.set_fill_column_time =
+ ADD_CHILD_TIMER(_profile, "SetFillColumnTime", orc_profile);
_orc_profile.decode_value_time = ADD_CHILD_TIMER(_profile,
"DecodeValueTime", orc_profile);
_orc_profile.decode_null_map_time =
ADD_CHILD_TIMER(_profile, "DecodeNullMapTime", orc_profile);
@@ -254,9 +260,14 @@ Status OrcReader::init_reader(
not_single_slot_filter_conjuncts->end());
}
_obj_pool = std::make_shared<ObjectPool>();
- SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
- RETURN_IF_ERROR(_create_file_reader());
- RETURN_IF_ERROR(_init_read_columns());
+ {
+ SCOPED_RAW_TIMER(&_statistics.create_reader_time);
+ RETURN_IF_ERROR(_create_file_reader());
+ }
+ {
+ SCOPED_RAW_TIMER(&_statistics.init_column_time);
+ RETURN_IF_ERROR(_init_read_columns());
+ }
return Status::OK();
}
@@ -646,7 +657,7 @@ Status OrcReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns) {
- SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
+ SCOPED_RAW_TIMER(&_statistics.set_fill_column_time);
// std::unordered_map<column_name, std::pair<col_id, slot_id>>
std::unordered_map<std::string, std::pair<uint32_t, int>>
predicate_columns;
@@ -864,7 +875,12 @@ void OrcReader::_init_bloom_filter(
}
void OrcReader::_init_system_properties() {
- _system_properties.system_type = _scan_params.file_type;
+ if (_scan_range.__isset.file_type) {
+ // for compatibility
+ _system_properties.system_type = _scan_range.file_type;
+ } else {
+ _system_properties.system_type = _scan_params.file_type;
+ }
_system_properties.properties = _scan_params.properties;
_system_properties.hdfs_params = _scan_params.hdfs_params;
if (_scan_params.__isset.broker_addresses) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 85a73e0e21..d05fb4c478 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -126,7 +126,9 @@ public:
int64_t fs_read_bytes = 0;
int64_t column_read_time = 0;
int64_t get_batch_time = 0;
- int64_t parse_meta_time = 0;
+ int64_t create_reader_time = 0;
+ int64_t init_column_time = 0;
+ int64_t set_fill_column_time = 0;
int64_t decode_value_time = 0;
int64_t decode_null_map_time = 0;
};
@@ -200,7 +202,9 @@ private:
RuntimeProfile::Counter* read_bytes;
RuntimeProfile::Counter* column_read_time;
RuntimeProfile::Counter* get_batch_time;
- RuntimeProfile::Counter* parse_meta_time;
+ RuntimeProfile::Counter* create_reader_time;
+ RuntimeProfile::Counter* init_column_time;
+ RuntimeProfile::Counter* set_fill_column_time;
RuntimeProfile::Counter* decode_value_time;
RuntimeProfile::Counter* decode_null_map_time;
};
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index fed33b6d28..ed4aa6e6ae 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -269,7 +269,12 @@ Status ParquetReader::open() {
}
void ParquetReader::_init_system_properties() {
- _system_properties.system_type = _scan_params.file_type;
+ if (_scan_range.__isset.file_type) {
+ // for compatibility
+ _system_properties.system_type = _scan_range.file_type;
+ } else {
+ _system_properties.system_type = _scan_params.file_type;
+ }
_system_properties.properties = _scan_params.properties;
_system_properties.hdfs_params = _scan_params.hdfs_params;
if (_scan_params.__isset.broker_addresses) {
diff --git a/be/src/vec/exec/scan/avro_jni_reader.cpp
b/be/src/vec/exec/scan/avro_jni_reader.cpp
index 5d1ef40cbc..ffca8682c4 100644
--- a/be/src/vec/exec/scan/avro_jni_reader.cpp
+++ b/be/src/vec/exec/scan/avro_jni_reader.cpp
@@ -74,7 +74,13 @@ Status AvroJNIReader::init_fetch_table_reader(
index++;
}
- TFileType::type type = _params.file_type;
+ TFileType::type type;
+ if (_range.__isset.file_type) {
+ // for compatibility
+ type = _range.file_type;
+ } else {
+ type = _params.file_type;
+ }
std::map<String, String> required_param = {
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()},
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 127539d26e..0b8d318551 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -49,6 +49,12 @@ Status NewFileScanNode::init(const TPlanNode& tnode,
RuntimeState* state) {
Status NewFileScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
+ if (state->get_query_ctx() != nullptr &&
+ state->get_query_ctx()->file_scan_range_params_map.count(id()) > 0) {
+ TFileScanRangeParams& params =
state->get_query_ctx()->file_scan_range_params_map[id()];
+ _input_tuple_id = params.src_tuple_id;
+ _output_tuple_id = params.dest_tuple_id;
+ }
return Status::OK();
}
@@ -74,7 +80,10 @@ void NewFileScanNode::set_scan_ranges(const
std::vector<TScanRangeParams>& scan_
_scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " <<
_scan_ranges.size();
}
- if (scan_ranges.size() > 0) {
+ if (scan_ranges.size() > 0 &&
+
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
+ // for compatibility.
+ // in new implement, the tuple id is set in prepare phase
_input_tuple_id =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.src_tuple_id;
_output_tuple_id =
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index b95d2aa773..38574262de 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -89,7 +89,6 @@ VFileScanner::VFileScanner(RuntimeState* state,
NewFileScanNode* parent, int64_t
const TFileScanRange& scan_range, RuntimeProfile*
profile,
ShardedKVCache* kv_cache)
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
- _params(scan_range.params),
_ranges(scan_range.ranges),
_next_range(0),
_cur_reader(nullptr),
@@ -99,6 +98,14 @@ VFileScanner::VFileScanner(RuntimeState* state,
NewFileScanNode* parent, int64_t
if (scan_range.params.__isset.strict_mode) {
_strict_mode = scan_range.params.strict_mode;
}
+
+ if (state->get_query_ctx() != nullptr &&
+ state->get_query_ctx()->file_scan_range_params_map.count(parent->id())
> 0) {
+ _params =
&(state->get_query_ctx()->file_scan_range_params_map[parent->id()]);
+ } else {
+ CHECK(scan_range.__isset.params);
+ _params = &(scan_range.params);
+ }
}
Status VFileScanner::prepare(
@@ -133,13 +140,13 @@ Status VFileScanner::prepare(
std::vector<TupleId>({_input_tuple_desc->id()}),
std::vector<bool>({false})));
// prepare pre filters
- if (_params.__isset.pre_filter_exprs_list) {
+ if (_params->__isset.pre_filter_exprs_list) {
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_trees(
- _params.pre_filter_exprs_list, _pre_conjunct_ctxs));
- } else if (_params.__isset.pre_filter_exprs) {
+ _params->pre_filter_exprs_list, _pre_conjunct_ctxs));
+ } else if (_params->__isset.pre_filter_exprs) {
VExprContextSPtr context;
RETURN_IF_ERROR(
-
doris::vectorized::VExpr::create_expr_tree(_params.pre_filter_exprs, context));
+
doris::vectorized::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
_pre_conjunct_ctxs.emplace_back(context);
}
@@ -569,9 +576,10 @@ Status VFileScanner::_get_next_reader() {
// create reader for specific format
Status init_status;
- TFileFormatType::type format_type = _params.format_type;
+ TFileFormatType::type format_type = _params->format_type;
// JNI reader can only push down column value range
- bool push_down_predicates = !_is_load && _params.format_type !=
TFileFormatType::FORMAT_JNI;
+ bool push_down_predicates =
+ !_is_load && _params->format_type !=
TFileFormatType::FORMAT_JNI;
if (format_type == TFileFormatType::FORMAT_JNI &&
range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
if (range.table_format_params.hudi_params.delta_logs.empty()) {
@@ -598,9 +606,9 @@ Status VFileScanner::_get_next_reader() {
->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
- _cur_reader =
- HudiJniReader::create_unique(_params,
range.table_format_params.hudi_params,
- _file_slot_descs, _state,
_profile);
+ _cur_reader = HudiJniReader::create_unique(*_params,
+
range.table_format_params.hudi_params,
+ _file_slot_descs,
_state, _profile);
init_status =
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
}
@@ -608,9 +616,11 @@ Status VFileScanner::_get_next_reader() {
}
case TFileFormatType::FORMAT_PARQUET: {
std::unique_ptr<ParquetReader> parquet_reader =
ParquetReader::create_unique(
- _profile, _params, range,
_state->query_options().batch_size,
+ _profile, *_params, range,
_state->query_options().batch_size,
const_cast<cctz::time_zone*>(&_state->timezone_obj()),
_io_ctx.get(), _state,
- ExecEnv::GetInstance()->file_meta_cache(),
+ config::max_external_file_meta_cache_num <= 0
+ ? nullptr
+ : ExecEnv::GetInstance()->file_meta_cache(),
_state->query_options().enable_parquet_lazy_mat);
{
SCOPED_TIMER(_open_reader_timer);
@@ -627,7 +637,7 @@ Status VFileScanner::_get_next_reader() {
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergTableReader> iceberg_reader =
IcebergTableReader::create_unique(std::move(parquet_reader), _profile,
- _state, _params,
range, _kv_cache,
+ _state, *_params,
range, _kv_cache,
_io_ctx.get());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map,
_colname_to_value_range,
@@ -649,7 +659,7 @@ Status VFileScanner::_get_next_reader() {
}
case TFileFormatType::FORMAT_ORC: {
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
- _profile, _state, _params, range,
_state->query_options().batch_size,
+ _profile, _state, *_params, range,
_state->query_options().batch_size,
_state->timezone(), _io_ctx.get(),
_state->query_options().enable_orc_lazy_mat);
if (push_down_predicates && _push_down_conjuncts.empty() &&
!_conjuncts.empty()) {
_push_down_conjuncts.resize(_conjuncts.size());
@@ -662,7 +672,7 @@ Status VFileScanner::_get_next_reader() {
range.table_format_params.table_format_type ==
"transactional_hive") {
std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
- _state,
_params, range,
+ _state,
*_params, range,
_io_ctx.get());
init_status = tran_orc_reader->init_reader(
_file_col_names, _colname_to_value_range,
_push_down_conjuncts,
@@ -686,13 +696,13 @@ Status VFileScanner::_get_next_reader() {
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
case TFileFormatType::FORMAT_PROTO: {
- _cur_reader = CsvReader::create_unique(_state, _profile,
&_counter, _params, range,
+ _cur_reader = CsvReader::create_unique(_state, _profile,
&_counter, *_params, range,
_file_slot_descs,
_io_ctx.get());
init_status =
((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
break;
}
case TFileFormatType::FORMAT_JSON: {
- _cur_reader = NewJsonReader::create_unique(_state, _profile,
&_counter, _params, range,
+ _cur_reader = NewJsonReader::create_unique(_state, _profile,
&_counter, *_params, range,
_file_slot_descs,
&_scanner_eof,
_io_ctx.get(),
_is_dynamic_schema);
init_status =
@@ -700,13 +710,14 @@ Status VFileScanner::_get_next_reader() {
break;
}
case TFileFormatType::FORMAT_AVRO: {
- _cur_reader = AvroJNIReader::create_unique(_state, _profile,
_params, _file_slot_descs);
+ _cur_reader =
+ AvroJNIReader::create_unique(_state, _profile, *_params,
_file_slot_descs);
init_status = ((AvroJNIReader*)(_cur_reader.get()))
->init_fetch_table_reader(_colname_to_value_range);
break;
}
default:
- return Status::InternalError("Not supported file format: {}",
_params.format_type);
+ return Status::InternalError("Not supported file format: {}",
_params->format_type);
}
if (init_status.is<END_OF_FILE>()) {
@@ -810,8 +821,8 @@ Status VFileScanner::_init_expr_ctxes() {
}
}
- _num_of_columns_from_file = _params.num_of_columns_from_file;
- for (const auto& slot_info : _params.required_slots) {
+ _num_of_columns_from_file = _params->num_of_columns_from_file;
+ for (const auto& slot_info : _params->required_slots) {
auto slot_id = slot_info.slot_id;
auto it = full_src_slot_map.find(slot_id);
if (it == std::end(full_src_slot_map)) {
@@ -843,8 +854,8 @@ Status VFileScanner::_init_expr_ctxes() {
continue;
}
vectorized::VExprContextSPtr ctx;
- auto it = _params.default_value_of_src_slot.find(slot_desc->id());
- if (it != std::end(_params.default_value_of_src_slot)) {
+ auto it = _params->default_value_of_src_slot.find(slot_desc->id());
+ if (it != std::end(_params->default_value_of_src_slot)) {
if (!it->second.nodes.empty()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
@@ -857,14 +868,14 @@ Status VFileScanner::_init_expr_ctxes() {
if (_is_load) {
// follow desc expr map is only for load task.
- bool has_slot_id_map =
_params.__isset.dest_sid_to_src_sid_without_trans;
+ bool has_slot_id_map =
_params->__isset.dest_sid_to_src_sid_without_trans;
int idx = 0;
for (auto slot_desc : _output_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
- auto it = _params.expr_of_dest_slot.find(slot_desc->id());
- if (it == std::end(_params.expr_of_dest_slot)) {
+ auto it = _params->expr_of_dest_slot.find(slot_desc->id());
+ if (it == std::end(_params->expr_of_dest_slot)) {
return Status::InternalError("No expr for dest slot, id={},
name={}",
slot_desc->id(),
slot_desc->col_name());
}
@@ -879,8 +890,8 @@ Status VFileScanner::_init_expr_ctxes() {
_dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
if (has_slot_id_map) {
- auto it1 =
_params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
- if (it1 ==
std::end(_params.dest_sid_to_src_sid_without_trans)) {
+ auto it1 =
_params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
+ if (it1 ==
std::end(_params->dest_sid_to_src_sid_without_trans)) {
_src_slot_descs_order_by_dest.emplace_back(nullptr);
} else {
auto _src_slot_it = full_src_slot_map.find(it1->second);
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 0b1d7a558a..9a16d056ed 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -89,7 +89,7 @@ protected:
protected:
std::unique_ptr<TextConverter> _text_converter;
- const TFileScanRangeParams& _params;
+ const TFileScanRangeParams* _params;
const std::vector<TFileRangeDesc>& _ranges;
int _next_range;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 09ed6361e5..5b6238bf14 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -180,11 +180,6 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
// Update required slots and column_idxs in scanRangeLocations.
setColumnPositionMapping();
- for (TScanRangeLocations location : scanRangeLocations) {
- TFileScanRangeParams rangeParams =
location.getScanRange().getExtScanRange().getFileScanRange().getParams();
- rangeParams.setRequiredSlots(params.getRequiredSlots());
- rangeParams.setColumnIdxs(params.getColumnIdxs());
- }
}
@Override
@@ -227,6 +222,10 @@ public abstract class FileQueryScanNode extends
FileScanNode {
params.setColumnIdxs(columnIdxs);
}
+ public TFileScanRangeParams getFileScanRangeParams() {
+ return params;
+ }
+
@Override
public void createScanRangeLocations() throws UserException {
long start = System.currentTimeMillis();
@@ -237,41 +236,24 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
TFileFormatType fileFormatType = getFileFormatType();
params.setFormatType(fileFormatType);
+ boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) ||
fileFormatType == TFileFormatType.FORMAT_JSON;
+ if (isCsvOrJson) {
+ params.setFileAttributes(getFileAttributes());
+ }
+
+ Map<String, String> locationProperties = getLocationProperties();
+ // for JNI, only need to set properties
+ if (fileFormatType == TFileFormatType.FORMAT_JNI) {
+ params.setProperties(locationProperties);
+ }
+
List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
- TFileScanRangeParams scanRangeParams = new
TFileScanRangeParams(params);
FileSplit fileSplit = (FileSplit) split;
TFileType locationType =
getLocationType(fileSplit.getPath().toString());
- scanRangeParams.setFileType(locationType);
- TFileCompressType fileCompressType =
getFileCompressType(fileSplit);
- scanRangeParams.setCompressType(fileCompressType);
- boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) ||
fileFormatType == TFileFormatType.FORMAT_JSON;
- if (isCsvOrJson) {
- scanRangeParams.setFileAttributes(getFileAttributes());
- }
-
- // set hdfs params for hdfs file type.
- Map<String, String> locationProperties = getLocationProperties();
- if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType
== TFileType.FILE_S3) {
- scanRangeParams.setProperties(locationProperties);
- }
- if (locationType == TFileType.FILE_HDFS || locationType ==
TFileType.FILE_BROKER) {
- String fsName = getFsName(fileSplit);
- THdfsParams tHdfsParams =
HdfsResource.generateHdfsParam(locationProperties);
- tHdfsParams.setFsName(fsName);
- scanRangeParams.setHdfsParams(tHdfsParams);
-
- if (locationType == TFileType.FILE_BROKER) {
- FsBroker broker =
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
- if (broker == null) {
- throw new UserException("No alive broker.");
- }
- scanRangeParams.addToBrokerAddresses(new
TNetworkAddress(broker.host, broker.port));
- }
- }
-
- TScanRangeLocations curLocations = newLocations(scanRangeParams);
+ setLocationPropertiesIfNecessary(locationType, fileSplit,
locationProperties);
+ TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected
from hive partitions.
// Otherwise, use the values in file path.
boolean isACID = false;
@@ -285,6 +267,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys,
locationType);
+ TFileCompressType fileCompressType =
getFileCompressType(fileSplit);
+ rangeDesc.setCompressType(fileCompressType);
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) split;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
@@ -332,11 +316,31 @@ public abstract class FileQueryScanNode extends
FileScanNode {
scanRangeLocations.size(), (System.currentTimeMillis() -
start));
}
- private TScanRangeLocations newLocations(TFileScanRangeParams params) {
+ private void setLocationPropertiesIfNecessary(TFileType locationType,
FileSplit fileSplit,
+ Map<String, String> locationProperties) throws UserException {
+ if (locationType == TFileType.FILE_HDFS || locationType ==
TFileType.FILE_BROKER) {
+ if (!params.isSetHdfsParams()) {
+ String fsName = getFsName(fileSplit);
+ THdfsParams tHdfsParams =
HdfsResource.generateHdfsParam(locationProperties);
+ tHdfsParams.setFsName(fsName);
+ params.setHdfsParams(tHdfsParams);
+ }
+
+ if (locationType == TFileType.FILE_BROKER &&
!params.isSetBrokerAddresses()) {
+ FsBroker broker =
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+ if (broker == null) {
+ throw new UserException("No alive broker.");
+ }
+ params.addToBrokerAddresses(new TNetworkAddress(broker.host,
broker.port));
+ }
+ } else if (locationType == TFileType.FILE_S3 &&
!params.isSetProperties()) {
+ params.setProperties(locationProperties);
+ }
+ }
+
+ private TScanRangeLocations newLocations() {
// Generate on file scan range
TFileScanRange fileScanRange = new TFileScanRange();
- fileScanRange.setParams(params);
-
// Scan range
TExternalScanRange externalScanRange = new TExternalScanRange();
externalScanRange.setFileScanRange(fileScanRange);
@@ -361,6 +365,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
+ rangeDesc.setFileType(locationType);
if (locationType == TFileType.FILE_HDFS) {
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
} else if (locationType == TFileType.FILE_S3
@@ -424,3 +429,4 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 180e3e1c75..a81bd1984b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -61,6 +61,7 @@ import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.planner.external.ExternalScanNode;
+import org.apache.doris.planner.external.FileQueryScanNode;
import org.apache.doris.planner.external.FileScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
@@ -83,6 +84,7 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileScanRange;
+import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPipelineFragmentParams;
@@ -165,6 +167,9 @@ public class Coordinator {
// copied from TQueryExecRequest; constant across all fragments
private final TDescriptorTable descTable;
+ // scan node id -> TFileScanRangeParams
+ private Map<Integer, TFileScanRangeParams> fileScanRangeParamsMap =
Maps.newHashMap();
+
// Why do we use query global?
// When `NOW()` function is in sql, we need only one now(),
// but, we execute `NOW()` distributed.
@@ -1937,6 +1942,11 @@ public class Coordinator {
k -> Sets.newHashSet());
scanNodeIds.add(scanNode.getId().asInt());
+ if (scanNode instanceof FileQueryScanNode) {
+ fileScanRangeParamsMap.put(
+ scanNode.getId().asInt(), ((FileQueryScanNode)
scanNode).getFileScanRangeParams());
+ }
+
FragmentScanRangeAssignment assignment
=
fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
boolean fragmentContainsColocateJoin =
isColocateFragment(scanNode.getFragment(),
@@ -2294,7 +2304,7 @@ public class Coordinator {
return executionProfile.isAllInstancesDone();
}
- // map from an impalad host address to the per-node assigned scan ranges;
+ // map from a BE host address to the per-node assigned scan ranges;
// records scan range assignment for a single fragment
class FragmentScanRangeAssignment
extends HashMap<TNetworkAddress, Map<Integer,
List<TScanRangeParams>>> {
@@ -2580,6 +2590,7 @@ public class Coordinator {
*/
public void unsetFields() {
this.rpcParams.unsetDescTbl();
+ this.rpcParams.unsetFileScanParams();
this.rpcParams.unsetCoord();
this.rpcParams.unsetQueryGlobals();
this.rpcParams.unsetResourceInfo();
@@ -2731,6 +2742,7 @@ public class Coordinator {
*/
public void unsetFields() {
this.rpcParams.unsetDescTbl();
+ this.rpcParams.unsetFileScanParams();
this.rpcParams.unsetCoord();
this.rpcParams.unsetQueryGlobals();
this.rpcParams.unsetResourceInfo();
@@ -3148,6 +3160,8 @@ public class Coordinator {
rf.getFilterId().asInt(), rf.toThrift());
}
}
+
+ params.setFileScanParams(fileScanRangeParamsMap);
paramsList.add(params);
}
return paramsList;
@@ -3186,6 +3200,8 @@ public class Coordinator {
if (tWorkloadGroups != null) {
params.setWorkloadGroups(tWorkloadGroups);
}
+
+ params.setFileScanParams(fileScanRangeParamsMap);
res.put(instanceExecParam.host, params);
}
TPipelineFragmentParams params =
res.get(instanceExecParam.host);
@@ -3420,3 +3436,4 @@ public class Coordinator {
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index ddc28de961..49961afcbc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -467,7 +467,6 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
private PFetchTableSchemaRequest getFetchTableStructureRequest() throws
AnalysisException, TException {
// set TFileScanRangeParams
TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams();
- fileScanRangeParams.setFileType(getTFileType());
fileScanRangeParams.setFormatType(fileFormatType);
fileScanRangeParams.setProperties(locationProperties);
fileScanRangeParams.setFileAttributes(getFileAttributes());
@@ -491,9 +490,10 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
throw new AnalysisException("Can not get first file, please check
uri.");
}
-
fileScanRangeParams.setCompressType(Util.getOrInferCompressType(compressionType,
firstFile.getPath()));
// set TFileRangeDesc
TFileRangeDesc fileRangeDesc = new TFileRangeDesc();
+ fileRangeDesc.setFileType(getTFileType());
+
fileRangeDesc.setCompressType(Util.getOrInferCompressType(compressionType,
firstFile.getPath()));
fileRangeDesc.setPath(firstFile.getPath());
fileRangeDesc.setStartOffset(0);
fileRangeDesc.setSize(firstFile.getSize());
@@ -508,3 +508,4 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
}
}
+
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 40457481b7..95dc716b7b 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -420,6 +420,9 @@ struct TExecPlanFragmentParams {
22: optional list<Types.TUniqueId> instances_sharing_hash_table;
23: optional string table_name;
+
+ // scan node id -> scan range params, only for external file scan
+ 24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams>
file_scan_params
}
struct TExecPlanFragmentParamsList {
@@ -632,6 +635,8 @@ struct TPipelineFragmentParams {
26: optional list<TPipelineWorkloadGroup> workload_groups
27: optional TTxnParams txn_conf
28: optional string table_name
+ // scan node id -> scan range params, only for external file scan
+ 29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams>
file_scan_params
}
struct TPipelineFragmentParamsList {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1b74b0cc70..9dacaac889 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -335,8 +335,10 @@ struct TTableFormatFileDesc {
}
struct TFileScanRangeParams {
+ // deprecated, move to TFileScanRange
1: optional Types.TFileType file_type;
2: optional TFileFormatType format_type;
+ // deprecated, move to TFileScanRange
3: optional TFileCompressType compress_type;
// If this is for load job, src point to the source table and dest point
to the doris table.
// If this is for query, only dest_tuple_id is set, including both file
slot and partition slot.
@@ -395,6 +397,8 @@ struct TFileRangeDesc {
8: optional TTableFormatFileDesc table_format_params
// Use modification time to determine whether the file is changed
9: optional i64 modification_time
+ 10: optional Types.TFileType file_type;
+ 11: optional TFileCompressType compress_type;
}
// TFileScanRange represents a set of descriptions of a file and the rules for
reading and converting it.
@@ -402,6 +406,10 @@ struct TFileRangeDesc {
// list<TFileRangeDesc>: file location and range
struct TFileScanRange {
1: optional list<TFileRangeDesc> ranges
+ // If file_scan_params in TExecPlanFragmentParams is set in
TExecPlanFragmentParams
+ // will use that field, otherwise, use this field.
+ // file_scan_params in TExecPlanFragmentParams will always be set in query
request,
+ // and TFileScanRangeParams here is used for some other request such as
fetch table schema for tvf.
2: optional TFileScanRangeParams params
}
diff --git
a/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy
b/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy
index 455d0333f4..cd8e1ee501 100644
---
a/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy
+++
b/regression-test/suites/external_table_emr_p2/hive/test_external_brown.groovy
@@ -285,7 +285,7 @@ suite("test_external_brown", "p2") {
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost =
context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort =
context.config.otherConfigs.get("extHiveHmsPort")
- String catalog_name = "external_yandex"
+ String catalog_name = "external_brown"
sql """drop catalog if exists ${catalog_name};"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]