This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch tpch500 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b3305688c9e7cf2d618bb452589a4dd27cf3893d Author: kaka11chen <[email protected]> AuthorDate: Wed Dec 13 20:40:41 2023 +0800 [Configurations](multi-catalog) Add `enable_parquet_merge_small_io` and `enable_orc_merge_small_io` Session variables. --- be/src/vec/exec/format/orc/vorc_reader.cpp | 9 ++++- be/src/vec/exec/format/orc/vorc_reader.h | 7 ++-- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 10 +++--- be/src/vec/exec/format/parquet/vparquet_reader.h | 1 + docs/en/docs/advanced/variables.md | 8 +++++ docs/zh-CN/docs/advanced/variables.md | 8 +++++ .../java/org/apache/doris/qe/SessionVariable.java | 40 ++++++++++++++++++++++ gensrc/thrift/PaloInternalService.thrift | 4 +++ 8 files changed, 80 insertions(+), 7 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 16377027132..ef4cced0508 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -149,6 +149,7 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state, _is_hive(params.__isset.slot_name_to_schema_pos), _io_ctx(io_ctx), _enable_lazy_mat(enable_lazy_mat), + _enable_merge_small_io(state->query_options().enable_orc_merge_small_io), _is_dict_cols_converted(false) { TimezoneUtils::find_cctz_time_zone(ctz, _time_zone); VecDateTimeValue t; @@ -169,6 +170,7 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r _file_system(nullptr), _io_ctx(io_ctx), _enable_lazy_mat(enable_lazy_mat), + _enable_merge_small_io(true), _is_dict_cols_converted(false) { _init_system_properties(); _init_file_description(); @@ -236,7 +238,8 @@ Status OrcReader::_create_file_reader() { _profile, _system_properties, _file_description, reader_options, &_file_system, &inner_reader, io::DelegateReader::AccessMode::RANDOM, _io_ctx)); _file_input_stream.reset(new ORCFileInputStream(_scan_range.path, inner_reader, - &_statistics, _io_ctx, _profile)); + &_statistics, _io_ctx, _profile, + _enable_merge_small_io)); } if (_file_input_stream->getLength() == 0) { return Status::EndOfFile("empty orc file: " + _scan_range.path); @@ -2213,6 +2216,10 @@ MutableColumnPtr OrcReader::_convert_dict_column_to_string_column( void ORCFileInputStream::beforeReadStripe( std::unique_ptr<orc::StripeInformation> current_strip_information, std::vector<bool> selected_columns) { + if (!_enable_merge_small_io) { + _file_reader = _inner_reader; + return; + } // Generate prefetch ranges, build stripe file reader. uint64_t offset = current_strip_information->getOffset(); std::vector<io::PrefetchRange> prefetch_ranges; diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 366231deaee..964de043c61 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -529,6 +529,7 @@ private: io::IOContext* _io_ctx = nullptr; bool _enable_lazy_mat = true; + bool _enable_merge_small_io = true; std::vector<DecimalScaleParams> _decimal_scale_params; size_t _decimal_scale_params_index; @@ -559,13 +560,14 @@ class ORCFileInputStream : public orc::InputStream { public: ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr inner_reader, OrcReader::Statistics* statistics, const io::IOContext* io_ctx, - RuntimeProfile* profile) + RuntimeProfile* profile, bool enable_merge_small_io) : _file_name(file_name), _inner_reader(inner_reader), _file_reader(inner_reader), _statistics(statistics), _io_ctx(io_ctx), - _profile(profile) {} + _profile(profile), + _enable_merge_small_io(enable_merge_small_io) {} ~ORCFileInputStream() override = default; @@ -588,6 +590,7 @@ private: OrcReader::Statistics* _statistics = nullptr; const io::IOContext* _io_ctx = nullptr; RuntimeProfile* _profile = nullptr; + bool _enable_merge_small_io = true; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 87555f6ee86..9efb68c7f1a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -91,7 +91,8 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams _io_ctx(io_ctx), _state(state), _meta_cache(meta_cache), - _enable_lazy_mat(enable_lazy_mat) { + _enable_lazy_mat(enable_lazy_mat), + _enable_merge_small_io(state->query_options().enable_parquet_merge_small_io) { _init_profile(); _init_system_properties(); _init_file_description(); @@ -104,7 +105,8 @@ ParquetReader::ParquetReader(const TFileScanRangeParams& params, const TFileRang _scan_range(range), _io_ctx(io_ctx), _state(state), - _enable_lazy_mat(enable_lazy_mat) { + _enable_lazy_mat(enable_lazy_mat), + _enable_merge_small_io(state->query_options().enable_parquet_merge_small_io) { _init_system_properties(); _init_file_description(); } @@ -603,7 +605,7 @@ Status ParquetReader::_next_row_group_reader() { RowGroupReader::PositionDeleteContext position_delete_ctx = _get_position_delete_ctx(row_group, row_group_index); io::FileReaderSPtr group_file_reader; - if (typeid_cast<io::InMemoryFileReader*>(_file_reader.get())) { + if (typeid_cast<io::InMemoryFileReader*>(_file_reader.get()) || (!_enable_merge_small_io)) { // InMemoryFileReader has the ability to merge small IO group_file_reader = _file_reader; } else { @@ -927,4 +929,4 @@ int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& } return column.data_page_offset; } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 376b3791b07..a16b3eaa134 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -267,6 +267,7 @@ private: // Maybe null if not used FileMetaCache* _meta_cache = nullptr; bool _enable_lazy_mat = true; + bool _enable_merge_small_io = true; const TupleDescriptor* _tuple_descriptor = nullptr; const RowDescriptor* _row_descriptor = nullptr; const std::unordered_map<std::string, int>* _colname_to_slot_id = nullptr; diff --git a/docs/en/docs/advanced/variables.md b/docs/en/docs/advanced/variables.md index 030e228f523..45fa84f0f25 100644 --- a/docs/en/docs/advanced/variables.md +++ b/docs/en/docs/advanced/variables.md @@ -670,6 +670,14 @@ Note that the comment must start with /*+ and can only follow the SELECT. Controls whether to use lazy materialization technology in orc reader. The default value is true. +* `enable_parquet_merge_small_io` + + Controls whether to merge small range io in parquet reader. The default value is true. + +* `enable_orc_merge_small_io` + + Controls whether to merge small range io in orc reader. The default value is true. + * `enable_strong_consistency_read` Used to enable strong consistent reading. By default, Doris supports strong consistency within the same session, that is, changes to data within the same session are visible in real time. If you want strong consistent reads between sessions, set this variable to true. diff --git a/docs/zh-CN/docs/advanced/variables.md b/docs/zh-CN/docs/advanced/variables.md index 2a697b01bf7..e40c6aa34ce 100644 --- a/docs/zh-CN/docs/advanced/variables.md +++ b/docs/zh-CN/docs/advanced/variables.md @@ -658,6 +658,14 @@ try (Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/ 控制 orc reader 是否启用延迟物化技术。默认为 true。 +* `enable_parquet_merge_small_io` + + 控制 parquet reader 是否启用小 IO 合并。默认为 true。 + +* `enable_orc_merge_small_Io` + + 控制 orc reader 是否启用小 IO 合并。默认为 true。 + * `enable_strong_consistency_read` 用以开启强一致读。Doris 默认支持同一个会话内的强一致性,即同一个会话内对数据的变更操作是实时可见的。如需要会话间的强一致读,则需将此变量设置为true。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index df1f093cfb6..a73400af87b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -388,6 +388,10 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_ORC_LAZY_MAT = "enable_orc_lazy_materialization"; + public static final String ENABLE_PARQUET_MERGE_SMALL_IO = "enable_parquet_merge_small_io"; + + public static final String ENABLE_ORC_MERGE_SMALL_IO = "enable_orc_merge_small_io"; + public static final String INLINE_CTE_REFERENCED_THRESHOLD = "inline_cte_referenced_threshold"; public static final String ENABLE_CTE_MATERIALIZE = "enable_cte_materialize"; @@ -1498,6 +1502,23 @@ public class SessionVariable implements Serializable, Writable { "use other health replica when the use_fix_replica meet error" }) public boolean fallbackOtherReplicaWhenFixedCorrupt = false; + @VariableMgr.VarAttr( + name = ENABLE_PARQUET_MERGE_SMALL_IO, + description = {"控制 parquet reader 是否启用小 IO 合并。默认为 true。", + "Controls whether to merge small range io in parquet reader. " + + "The default value is true."}, + needForward = true) + public boolean enableParquetMergeSmallIO = true; + + + @VariableMgr.VarAttr( + name = ENABLE_ORC_MERGE_SMALL_IO, + description = {"控制 orc reader 是否启用小 IO 合并。默认为 true。", + "Controls whether to merge small range io in orc reader. " + + "The default value is true."}, + needForward = true) + public boolean enableOrcMergeSmallIO = true; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { @@ -2618,6 +2639,22 @@ public class SessionVariable implements Serializable, Writable { this.loadStreamPerNode = loadStreamPerNode; } + public boolean isEnableParquetMergeSmallIO() { + return enableParquetMergeSmallIO; + } + + public void setEnableParquetMergeSmallIO(boolean enableParquetMergeSmallIO) { + this.enableParquetMergeSmallIO = enableParquetMergeSmallIO; + } + + public boolean isEnableOrcMergeSmallIO() { + return enableOrcMergeSmallIO; + } + + public void setEnableOrcMergeSmallIO(boolean enableOrcMergeSmallIO) { + this.enableOrcMergeSmallIO = enableOrcMergeSmallIO; + } + /** * Serialize to thrift object. * Used for rest api. @@ -2728,6 +2765,9 @@ public class SessionVariable implements Serializable, Writable { tResult.setInvertedIndexSkipThreshold(invertedIndexSkipThreshold); + tResult.setEnableParquetMergeSmallIo(enableParquetMergeSmallIO); + tResult.setEnableOrcMergeSmallIo(enableOrcMergeSmallIO); + return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 177bec22059..c08cfa72765 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -263,6 +263,10 @@ struct TQueryOptions { 93: optional i32 inverted_index_max_expansions = 50; 94: optional i32 inverted_index_skip_threshold = 50; + + 95: optional bool enable_parquet_merge_small_io = true + + 96: optional bool enable_orc_merge_small_io = true } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
