This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 93593a013d [feature](load) add segment bytes limit in segcompaction 
(#22526)
93593a013d is described below

commit 93593a013d6cb01a5518b83f3138776db9b009a4
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 4 18:00:52 2023 +0800

    [feature](load) add segment bytes limit in segcompaction (#22526)
---
 be/src/common/config.cpp                           |  21 +++--
 be/src/common/config.h                             |  21 +++--
 be/src/olap/olap_server.cpp                        |   4 +-
 be/src/olap/rowset/beta_rowset_writer.cpp          | 104 ++++++++++-----------
 be/src/olap/rowset/beta_rowset_writer.h            |   5 +-
 be/test/olap/segcompaction_test.cpp                |  26 +++---
 docs/en/docs/admin-manual/config/be-config.md      |  32 ++++++-
 docs/en/docs/advanced/best-practice/compaction.md  |   2 +-
 docs/zh-CN/docs/admin-manual/config/be-config.md   |  40 +++++++-
 .../docs/advanced/best-practice/compaction.md      |   2 +-
 10 files changed, 165 insertions(+), 92 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9b409891ce..1c353eb61d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -910,14 +910,23 @@ DEFINE_Bool(hide_webserver_config_page, "false");
 
 DEFINE_Bool(enable_segcompaction, "true");
 
-// Trigger segcompaction if the num of segments in a rowset exceeds this 
threshold.
-DEFINE_Int32(segcompaction_threshold_segment_num, "10");
+// Max number of segments allowed in a single segcompaction task.
+DEFINE_Int32(segcompaction_batch_size, "10");
 
-// The segment whose row number above the threshold will be compacted during 
segcompaction
-DEFINE_Int32(segcompaction_small_threshold, "1048576");
+// Max row count allowed in a single source segment, bigger segments will be 
skipped.
+DEFINE_Int32(segcompaction_candidate_max_rows, "1048576");
 
-// This config can be set to limit thread number in  segcompaction thread pool.
-DEFINE_mInt32(segcompaction_max_threads, "10");
+// Max file size allowed in a single source segment, bigger segments will be 
skipped.
+DEFINE_Int64(segcompaction_candidate_max_bytes, "104857600");
+
+// Max total row count allowed in a single segcompaction task.
+DEFINE_Int32(segcompaction_task_max_rows, "1572864");
+
+// Max total file size allowed in a single segcompaction task.
+DEFINE_Int64(segcompaction_task_max_bytes, "157286400");
+
+// Global segcompaction thread pool size.
+DEFINE_mInt32(segcompaction_num_threads, "5");
 
 // enable java udf and jdbc scannode
 DEFINE_Bool(enable_java_support, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c32d42a5cb..958b17aa45 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -951,14 +951,23 @@ DECLARE_Bool(hide_webserver_config_page);
 
 DECLARE_Bool(enable_segcompaction);
 
-// Trigger segcompaction if the num of segments in a rowset exceeds this 
threshold.
-DECLARE_Int32(segcompaction_threshold_segment_num);
+// Max number of segments allowed in a single segcompaction task.
+DECLARE_Int32(segcompaction_batch_size);
 
-// The segment whose row number above the threshold will be compacted during 
segcompaction
-DECLARE_Int32(segcompaction_small_threshold);
+// Max row count allowed in a single source segment, bigger segments will be 
skipped.
+DECLARE_Int32(segcompaction_candidate_max_rows);
 
-// This config can be set to limit thread number in  segcompaction thread pool.
-DECLARE_mInt32(segcompaction_max_threads);
+// Max file size allowed in a single source segment, bigger segments will be 
skipped.
+DECLARE_Int64(segcompaction_candidate_max_bytes);
+
+// Max total row count allowed in a single segcompaction task.
+DECLARE_Int32(segcompaction_task_max_rows);
+
+// Max total file size allowed in a single segcompaction task.
+DECLARE_Int64(segcompaction_task_max_bytes);
+
+// Global segcompaction thread pool size.
+DECLARE_mInt32(segcompaction_num_threads);
 
 // enable java udf and jdbc scannode
 DECLARE_Bool(enable_java_support);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index c7862316fb..e293499424 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -132,8 +132,8 @@ Status StorageEngine::start_bg_threads() {
 
     if (config::enable_segcompaction) {
         ThreadPoolBuilder("SegCompactionTaskThreadPool")
-                .set_min_threads(config::segcompaction_max_threads)
-                .set_max_threads(config::segcompaction_max_threads)
+                .set_min_threads(config::segcompaction_num_threads)
+                .set_max_threads(config::segcompaction_num_threads)
                 .build(&_seg_compaction_thread_pool);
     }
     ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index e51a3fa66f..35fb2c8677 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -32,6 +32,7 @@
 #include "common/config.h"
 #include "common/logging.h"
 #include "gutil/strings/substitute.h"
+#include "io/fs/file_reader.h"
 #include "io/fs/file_reader_options.h"
 #include "io/fs/file_system.h"
 #include "io/fs/file_writer.h"
@@ -170,27 +171,22 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t 
segment_id) {
     return Status::OK();
 }
 
-Status BetaRowsetWriter::_load_noncompacted_segments(
-        std::vector<segment_v2::SegmentSharedPtr>* segments, size_t num) {
+Status 
BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& 
segment,
+                                                    int32_t segment_id) {
     auto fs = _rowset_meta->fs();
     if (!fs) {
         return Status::Error<INIT_FAILED>(
-                "BetaRowsetWriter::_load_noncompacted_segments 
_rowset_meta->fs get failed");
-    }
-    for (int seg_id = _segcompacted_point; seg_id < num; ++seg_id) {
-        auto seg_path =
-                BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, seg_id);
-        std::shared_ptr<segment_v2::Segment> segment;
-        auto type = config::enable_file_cache ? config::file_cache_type : "";
-        io::FileReaderOptions reader_options(io::cache_type_from_string(type),
-                                             io::SegmentCachePathPolicy());
-        auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
-                                           _context.tablet_schema, 
reader_options, &segment);
-        if (!s.ok()) {
-            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
-            return s;
-        }
-        segments->push_back(std::move(segment));
+                "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs 
get failed");
+    }
+    auto path = BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, segment_id);
+    auto type = config::enable_file_cache ? config::file_cache_type : "";
+    io::FileReaderOptions reader_options(io::cache_type_from_string(type),
+                                         io::SegmentCachePathPolicy());
+    auto s = segment_v2::Segment::open(fs, path, segment_id, rowset_id(), 
_context.tablet_schema,
+                                       reader_options, &segment);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to open segment. " << path << ":" << s;
+        return s;
     }
     return Status::OK();
 }
@@ -200,43 +196,46 @@ Status BetaRowsetWriter::_load_noncompacted_segments(
  *  2. if the consecutive smalls end up with a big, compact the smalls, except
  *     single small
  *  3. if the consecutive smalls end up with small, compact the smalls if the
- *     length is beyond (config::segcompaction_threshold_segment_num / 2)
+ *     length is beyond (config::segcompaction_batch_size / 2)
  */
 Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
-        SegCompactionCandidatesSharedPtr segments) {
-    std::vector<segment_v2::SegmentSharedPtr> all_segments;
-    // subtract one to skip last (maybe active) segment
-    RETURN_IF_ERROR(_load_noncompacted_segments(&all_segments, _num_segment - 
1));
-
-    if (VLOG_DEBUG_IS_ON) {
-        vlog_buffer.clear();
-        for (auto& segment : all_segments) {
-            fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), 
segment->num_rows());
-        }
-        VLOG_DEBUG << "all noncompacted segments num:" << all_segments.size()
-                   << " list of segments:" << fmt::to_string(vlog_buffer);
-    }
-
-    bool is_terminated_by_big = false;
-    bool let_big_terminate = false;
-    size_t small_threshold = config::segcompaction_small_threshold;
-    for (int64_t i = 0; i < all_segments.size(); ++i) {
-        segment_v2::SegmentSharedPtr seg = all_segments[i];
-        if (seg->num_rows() > small_threshold) {
-            if (let_big_terminate) {
-                is_terminated_by_big = true;
-                break;
-            } else {
+        SegCompactionCandidatesSharedPtr& segments) {
+    segments = std::make_shared<SegCompactionCandidates>();
+    // skip last (maybe active) segment
+    int32_t last_segment = _num_segment - 1;
+    size_t task_bytes = 0;
+    uint32_t task_rows = 0;
+    int32_t segid;
+    for (segid = _segcompacted_point;
+         segid < last_segment && segments->size() < 
config::segcompaction_batch_size; segid++) {
+        segment_v2::SegmentSharedPtr segment;
+        RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid));
+        const auto segment_rows = segment->num_rows();
+        const auto segment_bytes = segment->file_reader()->size();
+        bool is_large_segment = segment_rows > 
config::segcompaction_candidate_max_rows ||
+                                segment_bytes > 
config::segcompaction_candidate_max_bytes;
+        if (is_large_segment) {
+            if (segid == _segcompacted_point) {
+                // skip large segments at the front
                 
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+                continue;
+            } else {
+                // stop because we need consecutive segments
+                break;
             }
-        } else {
-            let_big_terminate = true; // break if find a big after small
-            segments->push_back(seg);
         }
+        bool is_task_full = task_rows + segment_rows > 
config::segcompaction_task_max_rows ||
+                            task_bytes + segment_bytes > 
config::segcompaction_task_max_bytes;
+        if (is_task_full) {
+            break;
+        }
+        segments->push_back(segment);
+        task_rows += segment->num_rows();
+        task_bytes += segment->file_reader()->size();
     }
     size_t s = segments->size();
-    if (!is_terminated_by_big && s <= 
(config::segcompaction_threshold_segment_num / 2)) {
-        // start with big segments and end with small, better to do it in next
+    if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) {
+        // we didn't collect enough segments, better to do it in next
         // round to compact more at once
         segments->clear();
         return Status::OK();
@@ -371,9 +370,8 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
     if (_segcompaction_status.load() != OK) {
         status = Status::Error<SEGCOMPACTION_FAILED>(
                 "BetaRowsetWriter::_segcompaction_if_necessary meet invalid 
state");
-    } else if ((_num_segment - _segcompacted_point) >=
-               config::segcompaction_threshold_segment_num) {
-        SegCompactionCandidatesSharedPtr segments = 
std::make_shared<SegCompactionCandidates>();
+    } else if ((_num_segment - _segcompacted_point) >= 
config::segcompaction_batch_size) {
+        SegCompactionCandidatesSharedPtr segments;
         status = _find_longest_consecutive_small_segment(segments);
         if (LIKELY(status.ok()) && (segments->size() > 0)) {
             LOG(INFO) << "submit segcompaction task, tablet_id:" << 
_context.tablet_id
@@ -410,9 +408,7 @@ Status 
BetaRowsetWriter::_segcompaction_rename_last_segments() {
     // currently we only rename remaining segments to reduce wait time
     // so that transaction can be committed ASAP
     VLOG_DEBUG << "segcompaction last few segments";
-    SegCompactionCandidates segments;
-    RETURN_IF_ERROR(_load_noncompacted_segments(&segments, _num_segment));
-    for (int i = 0; i < segments.size(); ++i) {
+    for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
         
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
     }
     return Status::OK();
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 8c635f3459..22bed4feff 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -162,9 +162,8 @@ private:
             std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, 
int64_t end);
     Status _segcompaction_if_necessary();
     Status _segcompaction_rename_last_segments();
-    Status 
_load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
-                                       size_t num);
-    Status 
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr 
segments);
+    Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, 
int32_t segment_id);
+    Status 
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& 
segments);
     bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
 
     bool _check_and_set_is_doing_segcompaction();
diff --git a/be/test/olap/segcompaction_test.cpp 
b/be/test/olap/segcompaction_test.cpp
index 6a4476f423..2a894b9197 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -231,9 +231,9 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
     RowsetSharedPtr rowset;
     const int num_segments = 15;
     const uint32_t rows_per_segment = 4096;
-    config::segcompaction_small_threshold = 6000; // set threshold above
-                                                  // rows_per_segment
-    config::segcompaction_threshold_segment_num = 10;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    config::segcompaction_batch_size = 10;
     std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
@@ -340,8 +340,8 @@ TEST_F(SegCompactionTest, 
SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
     create_tablet_schema(tablet_schema, DUP_KEYS);
 
     RowsetSharedPtr rowset;
-    config::segcompaction_small_threshold = 6000; // set threshold above
-                                                  // rows_per_segment
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
     std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
@@ -484,8 +484,8 @@ TEST_F(SegCompactionTest, 
SegCompactionInterleaveWithBig_OoOoO) {
     create_tablet_schema(tablet_schema, DUP_KEYS);
 
     RowsetSharedPtr rowset;
-    config::segcompaction_small_threshold = 6000; // set threshold above
-    config::segcompaction_threshold_segment_num = 5;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+    config::segcompaction_batch_size = 5;
     std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
@@ -607,9 +607,9 @@ TEST_F(SegCompactionTest, 
SegCompactionThenReadUniqueTableSmall) {
     create_tablet_schema(tablet_schema, UNIQUE_KEYS);
 
     RowsetSharedPtr rowset;
-    config::segcompaction_small_threshold = 6000; // set threshold above
-                                                  // rows_per_segment
-    config::segcompaction_threshold_segment_num = 3;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    config::segcompaction_batch_size = 3;
     std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
@@ -841,9 +841,9 @@ TEST_F(SegCompactionTest, 
SegCompactionThenReadAggTableSmall) {
     create_tablet_schema(tablet_schema, AGG_KEYS);
 
     RowsetSharedPtr rowset;
-    config::segcompaction_small_threshold = 6000; // set threshold above
-                                                  // rows_per_segment
-    config::segcompaction_threshold_segment_num = 3;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    config::segcompaction_batch_size = 3;
     std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index bfc61c56e9..f558d40118 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -629,18 +629,42 @@ BaseCompaction:546859:
 * Description: Enable to use segment compaction during loading to avoid -238 
error
 * Default value: true
 
-#### `segcompaction_threshold_segment_num`
+#### `segcompaction_batch_size`
 
 * Type: int32
-* Description: Trigger segcompaction if the num of segments in a rowset 
exceeds this threshold
+* Description: Max number of segments allowed in a single segcompaction task.
 * Default value: 10
 
-#### `segcompaction_small_threshold`
+#### `segcompaction_candidate_max_rows`
 
 * Type: int32
-* Description: The segment whose row number above the threshold will be 
compacted during segcompaction
+* Description: Max row count allowed in a single source segment, bigger 
segments will be skipped.
 * Default value: 1048576
 
+#### `segcompaction_candidate_max_bytes`
+
+* Type: int64
+* Description: Max file size allowed in a single source segment, bigger 
segments will be skipped.
+* Default value: 104857600
+
+#### `segcompaction_task_max_rows`
+
+* Type: int32
+* Description: Max total row count allowed in a single segcompaction task.
+* Default value: 1572864
+
+#### `segcompaction_task_max_bytes`
+
+* Type: int64
+* Description: Max total file size allowed in a single segcompaction task.
+* Default value: 157286400
+
+#### `segcompaction_num_threads`
+
+* Type: int32
+* Description: Global segcompaction thread pool size.
+* Default value: 5
+
 #### `disable_compaction_trace_log`
 
 * Type: bool
diff --git a/docs/en/docs/advanced/best-practice/compaction.md 
b/docs/en/docs/advanced/best-practice/compaction.md
index 5963a1bdfb..5168b38dab 100644
--- a/docs/en/docs/advanced/best-practice/compaction.md
+++ b/docs/en/docs/advanced/best-practice/compaction.md
@@ -59,7 +59,7 @@ The following features are provided by segment compaction:
 
 BE configuration:
 - `enable_segcompaction=true` turn it on.
-- `segcompaction_threshold_segment_num` is used to configure the interval for 
merging. The default value 10 means that every 10 segment files will trigger a 
segment compaction. It is recommended to set between 10 - 30. The larger value 
will increase the memory usage of segment compaction.
+- `segcompaction_batch_size` is used to configure the interval for merging. 
The default value 10 means that every 10 segment files will trigger a segment 
compaction. It is recommended to set between 10 - 30. The larger value will 
increase the memory usage of segment compaction.
 
 Situations where segment compaction is recommended:
 
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 7a3ac0315a..edb4e3e10d 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -643,18 +643,54 @@ BaseCompaction:546859:
 * 描述:在导入时进行 segment compaction 来减少 segment 数量, 以避免出现写入时的 -238 错误
 * 默认值:true
 
-#### `segcompaction_threshold_segment_num`
+#### `segcompaction_batch_size`
 
 * 类型:int32
 * 描述:当 segment 数量超过此阈值时触发 segment compaction
 * 默认值:10
 
-#### `segcompaction_small_threshold`
+#### `segcompaction_candidate_max_rows`
 
 * 类型:int32
 * 描述:当 segment 的行数超过此大小时则会在 segment compaction 时被 compact,否则跳过
 * 默认值:1048576
 
+#### `segcompaction_batch_size`
+
+* 类型: int32
+* 描述: 单个 segment compaction 任务中的最大原始 segment 数量。
+* 默认值: 10
+
+#### `segcompaction_candidate_max_rows`
+
+* 类型: int32
+* 描述: segment compaction 任务中允许的单个原始 segment 行数,过大的 segment 将被跳过。
+* 默认值: 1048576
+
+#### `segcompaction_candidate_max_bytes`
+
+* 类型: int64
+* 描述: segment compaction 任务中允许的单个原始 segment 大小(字节),过大的 segment 将被跳过。
+* 默认值: 104857600
+
+#### `segcompaction_task_max_rows`
+
+* 类型: int32
+* 描述: 单个 segment compaction 任务中允许的原始 segment 总行数。
+* 默认值: 1572864
+
+#### `segcompaction_task_max_bytes`
+
+* 类型: int64
+* 描述: 单个 segment compaction 任务中允许的原始 segment 总大小(字节)。
+* 默认值: 157286400
+
+#### `segcompaction_num_threads`
+
+* 类型: int32
+* 描述: segment compaction 线程池大小。
+* 默认值: 5
+
 #### `disable_compaction_trace_log`
 
 * 类型: bool
diff --git a/docs/zh-CN/docs/advanced/best-practice/compaction.md 
b/docs/zh-CN/docs/advanced/best-practice/compaction.md
index 342397c740..5b8562db50 100644
--- a/docs/zh-CN/docs/advanced/best-practice/compaction.md
+++ b/docs/zh-CN/docs/advanced/best-practice/compaction.md
@@ -57,7 +57,7 @@ Segment compaction 有以下特点:
 
 开启和配置方法(BE 配置):
 - `enable_segcompaction = true` 可以使能该功能
-- `segcompaction_threshold_segment_num` 用于配置合并的间隔。默认 10 表示每生成 10 个 segment 
文件将会进行一次 segment compaction。一般设置为 10 - 30,过大的值会增加 segment compaction 的内存用量。
+- `segcompaction_batch_size` 用于配置合并的间隔。默认 10 表示每生成 10 个 segment 文件将会进行一次 
segment compaction。一般设置为 10 - 30,过大的值会增加 segment compaction 的内存用量。
 
 如有以下场景或问题,建议开启此功能:
 - 导入大量数据触发 OLAP_ERR_TOO_MANY_SEGMENTS (errcode -238) 错误导致导入失败。此时建议打开 segment 
compaction 的功能,在导入过程中对 segment 进行合并控制最终的数量。


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to