This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new aa2d968ad54 branch-3.1: [Opt](MergeIO) make read slice size is 
configurable #55936 #57272 (#57159)
aa2d968ad54 is described below

commit aa2d968ad54070ef3d545b7aa18ad85fd1c348cd
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Oct 30 14:34:49 2025 +0800

    branch-3.1: [Opt](MergeIO) make read slice size is configurable #55936 
#57272 (#57159)
    
    bp #55936 #57272
    
    ---------
    
    Co-authored-by: shee <[email protected]>
    Co-authored-by: garenshi <[email protected]>
---
 be/src/io/fs/buffered_reader.cpp                            |  4 ++--
 be/src/io/fs/buffered_reader.h                              | 11 ++++++++++-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp          | 13 +++++++++----
 .../src/main/java/org/apache/doris/analysis/SetVar.java     | 10 ++++++++++
 .../src/main/java/org/apache/doris/qe/SessionVariable.java  |  8 ++++++++
 gensrc/thrift/PaloInternalService.thrift                    |  1 +
 6 files changed, 40 insertions(+), 7 deletions(-)

diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 7fd85caa43b..8733062827d 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -101,7 +101,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
 
     // merge small IO
     size_t merge_start = offset + has_read;
-    const size_t merge_end = merge_start + READ_SLICE_SIZE;
+    const size_t merge_end = merge_start + _merged_read_slice_size;
     // <slice_size, is_content>
     std::vector<std::pair<size_t, bool>> merged_slice;
     size_t content_size = 0;
@@ -310,7 +310,7 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& 
cached_data, size_t off
 Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, 
size_t to_read,
                                        size_t* bytes_read, const IOContext* 
io_ctx) {
     if (!_read_slice) {
-        _read_slice = std::make_unique<OwnedSlice>(READ_SLICE_SIZE);
+        _read_slice = std::make_unique<OwnedSlice>(_merged_read_slice_size);
     }
 
     *bytes_read = 0;
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 67e07665fbf..84d26abc1fb 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -275,7 +275,8 @@ public:
     static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128
 
     MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
-                         const std::vector<PrefetchRange>& 
random_access_ranges)
+                         const std::vector<PrefetchRange>& 
random_access_ranges,
+                         int64_t merge_read_slice_size = READ_SLICE_SIZE)
             : _profile(profile),
               _reader(std::move(reader)),
               _random_access_ranges(random_access_ranges) {
@@ -288,6 +289,13 @@ public:
         // 1MB for oss, 8KB for hdfs
         _equivalent_io_size =
                 _is_oss ? config::merged_oss_min_io_size : 
config::merged_hdfs_min_io_size;
+
+        _merged_read_slice_size = merge_read_slice_size;
+
+        if (_merged_read_slice_size < 0) {
+            _merged_read_slice_size = READ_SLICE_SIZE;
+        }
+
         for (const PrefetchRange& range : _random_access_ranges) {
             _statistics.apply_bytes += range.end_offset - range.start_offset;
         }
@@ -388,6 +396,7 @@ private:
     bool _is_oss;
     double _max_amplified_ratio;
     size_t _equivalent_io_size;
+    int64_t _merged_read_slice_size;
 
     Statistics _statistics;
 };
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 56d88430e16..36f8028225d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -612,12 +612,17 @@ Status ParquetReader::_next_row_group_reader() {
         size_t avg_io_size = 0;
         const std::vector<io::PrefetchRange> io_ranges =
                 _generate_random_access_ranges(row_group_index, &avg_io_size);
+        int64_t merged_read_slice_size = -1;
+        if (_state != nullptr && 
_state->query_options().__isset.merge_read_slice_size) {
+            merged_read_slice_size = 
_state->query_options().merge_read_slice_size;
+        }
         // The underlying page reader will prefetch data in column.
         // Using both MergeRangeFileReader and BufferedStreamReader 
simultaneously would waste a lot of memory.
-        group_file_reader = avg_io_size < io::MergeRangeFileReader::SMALL_IO
-                                    ? 
std::make_shared<io::MergeRangeFileReader>(
-                                              _profile, _file_reader, 
io_ranges)
-                                    : _file_reader;
+        group_file_reader =
+                avg_io_size < io::MergeRangeFileReader::SMALL_IO
+                        ? std::make_shared<io::MergeRangeFileReader>(
+                                  _profile, _file_reader, io_ranges, 
merged_read_slice_size)
+                        : _file_reader;
     }
     _current_group_reader.reset(
             new RowGroupReader(_io_ctx ? 
std::make_shared<io::TracingFileReader>(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
index e80860bf584..281fabc7b7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
@@ -197,6 +197,16 @@ public class SetVar {
             }
             this.result = (LiteralExpr) this.value;
         }
+        if 
(getVariable().equalsIgnoreCase(SessionVariable.MERGE_IO_READ_SLICE_SIZE_BYTES))
 {
+            try {
+                this.value = new StringLiteral(
+                        
Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue())));
+            } catch (Throwable t) {
+                // See file_split_size comment
+                this.value = new StringLiteral(getResult().getStringValue());
+            }
+            this.result = (LiteralExpr) this.value;
+        }
         if (getVariable().equalsIgnoreCase("is_report_success")) {
             variable = SessionVariable.ENABLE_PROFILE;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 859877f374f..0f36635d8b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -751,6 +751,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String 
DEFAULT_VARIANT_MAX_SPARSE_COLUMN_STATISTICS_SIZE =
                                                             
"default_variant_max_sparse_column_statistics_size";
 
+    public static final String MERGE_IO_READ_SLICE_SIZE_BYTES = 
"merge_io_read_slice_size_bytes";
+
     public static final String ENABLE_PREFER_CACHED_ROWSET = 
"enable_prefer_cached_rowset";
     public static final String QUERY_FRESHNESS_TOLERANCE_MS = 
"query_freshness_tolerance_ms";
 
@@ -2269,6 +2271,11 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = QUERY_FRESHNESS_TOLERANCE_MS, needForward = 
false)
     public long queryFreshnessToleranceMs = -1;
 
+    @VariableMgr.VarAttr(name = MERGE_IO_READ_SLICE_SIZE_BYTES, description = {
+            "调整 READ_SLICE_SIZE 大小,降低 Merge IO 读放大影响",
+            "Make the READ_SLICE_SIZE variable configurable to reduce the 
impact caused by read amplification."})
+    public int mergeReadSliceSizeBytes = 8388608;
+
     public Set<Integer> getIgnoredRuntimeFilterIds() {
         Set<Integer> ids = Sets.newLinkedHashSet();
         if (ignoreRuntimeFilterIds.isEmpty()) {
@@ -4365,6 +4372,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setNewIsIpAddressInRange(newIsIpAddressInRange);
         
tResult.setEnableRuntimeFilterPartitionPrune(enableRuntimeFilterPartitionPrune);
+        tResult.setMergeReadSliceSize(mergeReadSliceSizeBytes);
         return tResult;
     }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 2dfab470dd0..18ca4c02fc6 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -368,6 +368,7 @@ struct TQueryOptions {
 
   172: optional bool enable_prefer_cached_rowset
   173: optional i64 query_freshness_tolerance_ms
+  174: optional i64 merge_read_slice_size = 8388608;
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to