This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new aa2d968ad54 branch-3.1: [Opt](MergeIO) make read slice size is
configurable #55936 #57272 (#57159)
aa2d968ad54 is described below
commit aa2d968ad54070ef3d545b7aa18ad85fd1c348cd
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Oct 30 14:34:49 2025 +0800
branch-3.1: [Opt](MergeIO) make read slice size is configurable #55936
#57272 (#57159)
bp #55936 #57272
---------
Co-authored-by: shee <[email protected]>
Co-authored-by: garenshi <[email protected]>
---
be/src/io/fs/buffered_reader.cpp | 4 ++--
be/src/io/fs/buffered_reader.h | 11 ++++++++++-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 13 +++++++++----
.../src/main/java/org/apache/doris/analysis/SetVar.java | 10 ++++++++++
.../src/main/java/org/apache/doris/qe/SessionVariable.java | 8 ++++++++
gensrc/thrift/PaloInternalService.thrift | 1 +
6 files changed, 40 insertions(+), 7 deletions(-)
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 7fd85caa43b..8733062827d 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -101,7 +101,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset,
Slice result, size_t* b
// merge small IO
size_t merge_start = offset + has_read;
- const size_t merge_end = merge_start + READ_SLICE_SIZE;
+ const size_t merge_end = merge_start + _merged_read_slice_size;
// <slice_size, is_content>
std::vector<std::pair<size_t, bool>> merged_slice;
size_t content_size = 0;
@@ -310,7 +310,7 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData&
cached_data, size_t off
Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset,
size_t to_read,
size_t* bytes_read, const IOContext*
io_ctx) {
if (!_read_slice) {
- _read_slice = std::make_unique<OwnedSlice>(READ_SLICE_SIZE);
+ _read_slice = std::make_unique<OwnedSlice>(_merged_read_slice_size);
}
*bytes_read = 0;
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 67e07665fbf..84d26abc1fb 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -275,7 +275,8 @@ public:
static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128
MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
- const std::vector<PrefetchRange>&
random_access_ranges)
+ const std::vector<PrefetchRange>&
random_access_ranges,
+ int64_t merge_read_slice_size = READ_SLICE_SIZE)
: _profile(profile),
_reader(std::move(reader)),
_random_access_ranges(random_access_ranges) {
@@ -288,6 +289,13 @@ public:
// 1MB for oss, 8KB for hdfs
_equivalent_io_size =
_is_oss ? config::merged_oss_min_io_size :
config::merged_hdfs_min_io_size;
+
+ _merged_read_slice_size = merge_read_slice_size;
+
+ if (_merged_read_slice_size < 0) {
+ _merged_read_slice_size = READ_SLICE_SIZE;
+ }
+
for (const PrefetchRange& range : _random_access_ranges) {
_statistics.apply_bytes += range.end_offset - range.start_offset;
}
@@ -388,6 +396,7 @@ private:
bool _is_oss;
double _max_amplified_ratio;
size_t _equivalent_io_size;
+ int64_t _merged_read_slice_size;
Statistics _statistics;
};
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 56d88430e16..36f8028225d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -612,12 +612,17 @@ Status ParquetReader::_next_row_group_reader() {
size_t avg_io_size = 0;
const std::vector<io::PrefetchRange> io_ranges =
_generate_random_access_ranges(row_group_index, &avg_io_size);
+ int64_t merged_read_slice_size = -1;
+ if (_state != nullptr &&
_state->query_options().__isset.merge_read_slice_size) {
+ merged_read_slice_size =
_state->query_options().merge_read_slice_size;
+ }
// The underlying page reader will prefetch data in column.
// Using both MergeRangeFileReader and BufferedStreamReader
simultaneously would waste a lot of memory.
- group_file_reader = avg_io_size < io::MergeRangeFileReader::SMALL_IO
- ?
std::make_shared<io::MergeRangeFileReader>(
- _profile, _file_reader,
io_ranges)
- : _file_reader;
+ group_file_reader =
+ avg_io_size < io::MergeRangeFileReader::SMALL_IO
+ ? std::make_shared<io::MergeRangeFileReader>(
+ _profile, _file_reader, io_ranges,
merged_read_slice_size)
+ : _file_reader;
}
_current_group_reader.reset(
new RowGroupReader(_io_ctx ?
std::make_shared<io::TracingFileReader>(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
index e80860bf584..281fabc7b7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
@@ -197,6 +197,16 @@ public class SetVar {
}
this.result = (LiteralExpr) this.value;
}
+ if
(getVariable().equalsIgnoreCase(SessionVariable.MERGE_IO_READ_SLICE_SIZE_BYTES))
{
+ try {
+ this.value = new StringLiteral(
+
Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue())));
+ } catch (Throwable t) {
+ // See file_split_size comment
+ this.value = new StringLiteral(getResult().getStringValue());
+ }
+ this.result = (LiteralExpr) this.value;
+ }
if (getVariable().equalsIgnoreCase("is_report_success")) {
variable = SessionVariable.ENABLE_PROFILE;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 859877f374f..0f36635d8b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -751,6 +751,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String
DEFAULT_VARIANT_MAX_SPARSE_COLUMN_STATISTICS_SIZE =
"default_variant_max_sparse_column_statistics_size";
+ public static final String MERGE_IO_READ_SLICE_SIZE_BYTES =
"merge_io_read_slice_size_bytes";
+
public static final String ENABLE_PREFER_CACHED_ROWSET =
"enable_prefer_cached_rowset";
public static final String QUERY_FRESHNESS_TOLERANCE_MS =
"query_freshness_tolerance_ms";
@@ -2269,6 +2271,11 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = QUERY_FRESHNESS_TOLERANCE_MS, needForward =
false)
public long queryFreshnessToleranceMs = -1;
+ @VariableMgr.VarAttr(name = MERGE_IO_READ_SLICE_SIZE_BYTES, description = {
+ "调整 READ_SLICE_SIZE 大小,降低 Merge IO 读放大影响",
+ "Make the READ_SLICE_SIZE variable configurable to reduce the
impact caused by read amplification."})
+ public int mergeReadSliceSizeBytes = 8388608;
+
public Set<Integer> getIgnoredRuntimeFilterIds() {
Set<Integer> ids = Sets.newLinkedHashSet();
if (ignoreRuntimeFilterIds.isEmpty()) {
@@ -4365,6 +4372,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setNewIsIpAddressInRange(newIsIpAddressInRange);
tResult.setEnableRuntimeFilterPartitionPrune(enableRuntimeFilterPartitionPrune);
+ tResult.setMergeReadSliceSize(mergeReadSliceSizeBytes);
return tResult;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 2dfab470dd0..18ca4c02fc6 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -368,6 +368,7 @@ struct TQueryOptions {
172: optional bool enable_prefer_cached_rowset
173: optional i64 query_freshness_tolerance_ms
+ 174: optional i64 merge_read_slice_size = 8388608;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]