This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 93593a013d [feature](load) add segment bytes limit in segcompaction
(#22526)
93593a013d is described below
commit 93593a013d6cb01a5518b83f3138776db9b009a4
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 4 18:00:52 2023 +0800
[feature](load) add segment bytes limit in segcompaction (#22526)
---
be/src/common/config.cpp | 21 +++--
be/src/common/config.h | 21 +++--
be/src/olap/olap_server.cpp | 4 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 104 ++++++++++-----------
be/src/olap/rowset/beta_rowset_writer.h | 5 +-
be/test/olap/segcompaction_test.cpp | 26 +++---
docs/en/docs/admin-manual/config/be-config.md | 32 ++++++-
docs/en/docs/advanced/best-practice/compaction.md | 2 +-
docs/zh-CN/docs/admin-manual/config/be-config.md | 40 +++++++-
.../docs/advanced/best-practice/compaction.md | 2 +-
10 files changed, 165 insertions(+), 92 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9b409891ce..1c353eb61d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -910,14 +910,23 @@ DEFINE_Bool(hide_webserver_config_page, "false");
DEFINE_Bool(enable_segcompaction, "true");
-// Trigger segcompaction if the num of segments in a rowset exceeds this
threshold.
-DEFINE_Int32(segcompaction_threshold_segment_num, "10");
+// Max number of segments allowed in a single segcompaction task.
+DEFINE_Int32(segcompaction_batch_size, "10");
-// The segment whose row number above the threshold will be compacted during
segcompaction
-DEFINE_Int32(segcompaction_small_threshold, "1048576");
+// Max row count allowed in a single source segment, bigger segments will be
skipped.
+DEFINE_Int32(segcompaction_candidate_max_rows, "1048576");
-// This config can be set to limit thread number in segcompaction thread pool.
-DEFINE_mInt32(segcompaction_max_threads, "10");
+// Max file size allowed in a single source segment, bigger segments will be
skipped.
+DEFINE_Int64(segcompaction_candidate_max_bytes, "104857600");
+
+// Max total row count allowed in a single segcompaction task.
+DEFINE_Int32(segcompaction_task_max_rows, "1572864");
+
+// Max total file size allowed in a single segcompaction task.
+DEFINE_Int64(segcompaction_task_max_bytes, "157286400");
+
+// Global segcompaction thread pool size.
+DEFINE_mInt32(segcompaction_num_threads, "5");
// enable java udf and jdbc scannode
DEFINE_Bool(enable_java_support, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c32d42a5cb..958b17aa45 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -951,14 +951,23 @@ DECLARE_Bool(hide_webserver_config_page);
DECLARE_Bool(enable_segcompaction);
-// Trigger segcompaction if the num of segments in a rowset exceeds this
threshold.
-DECLARE_Int32(segcompaction_threshold_segment_num);
+// Max number of segments allowed in a single segcompaction task.
+DECLARE_Int32(segcompaction_batch_size);
-// The segment whose row number above the threshold will be compacted during
segcompaction
-DECLARE_Int32(segcompaction_small_threshold);
+// Max row count allowed in a single source segment, bigger segments will be
skipped.
+DECLARE_Int32(segcompaction_candidate_max_rows);
-// This config can be set to limit thread number in segcompaction thread pool.
-DECLARE_mInt32(segcompaction_max_threads);
+// Max file size allowed in a single source segment, bigger segments will be
skipped.
+DECLARE_Int64(segcompaction_candidate_max_bytes);
+
+// Max total row count allowed in a single segcompaction task.
+DECLARE_Int32(segcompaction_task_max_rows);
+
+// Max total file size allowed in a single segcompaction task.
+DECLARE_Int64(segcompaction_task_max_bytes);
+
+// Global segcompaction thread pool size.
+DECLARE_mInt32(segcompaction_num_threads);
// enable java udf and jdbc scannode
DECLARE_Bool(enable_java_support);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index c7862316fb..e293499424 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -132,8 +132,8 @@ Status StorageEngine::start_bg_threads() {
if (config::enable_segcompaction) {
ThreadPoolBuilder("SegCompactionTaskThreadPool")
- .set_min_threads(config::segcompaction_max_threads)
- .set_max_threads(config::segcompaction_max_threads)
+ .set_min_threads(config::segcompaction_num_threads)
+ .set_max_threads(config::segcompaction_num_threads)
.build(&_seg_compaction_thread_pool);
}
ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index e51a3fa66f..35fb2c8677 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -32,6 +32,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "gutil/strings/substitute.h"
+#include "io/fs/file_reader.h"
#include "io/fs/file_reader_options.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
@@ -170,27 +171,22 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t
segment_id) {
return Status::OK();
}
-Status BetaRowsetWriter::_load_noncompacted_segments(
- std::vector<segment_v2::SegmentSharedPtr>* segments, size_t num) {
+Status
BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr&
segment,
+ int32_t segment_id) {
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>(
- "BetaRowsetWriter::_load_noncompacted_segments
_rowset_meta->fs get failed");
- }
- for (int seg_id = _segcompacted_point; seg_id < num; ++seg_id) {
- auto seg_path =
- BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id, seg_id);
- std::shared_ptr<segment_v2::Segment> segment;
- auto type = config::enable_file_cache ? config::file_cache_type : "";
- io::FileReaderOptions reader_options(io::cache_type_from_string(type),
- io::SegmentCachePathPolicy());
- auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
- _context.tablet_schema,
reader_options, &segment);
- if (!s.ok()) {
- LOG(WARNING) << "failed to open segment. " << seg_path << ":" <<
s.to_string();
- return s;
- }
- segments->push_back(std::move(segment));
+ "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs
get failed");
+ }
+ auto path = BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id, segment_id);
+ auto type = config::enable_file_cache ? config::file_cache_type : "";
+ io::FileReaderOptions reader_options(io::cache_type_from_string(type),
+ io::SegmentCachePathPolicy());
+ auto s = segment_v2::Segment::open(fs, path, segment_id, rowset_id(),
_context.tablet_schema,
+ reader_options, &segment);
+ if (!s.ok()) {
+ LOG(WARNING) << "failed to open segment. " << path << ":" << s;
+ return s;
}
return Status::OK();
}
@@ -200,43 +196,46 @@ Status BetaRowsetWriter::_load_noncompacted_segments(
* 2. if the consecutive smalls end up with a big, compact the smalls, except
* single small
* 3. if the consecutive smalls end up with small, compact the smalls if the
- * length is beyond (config::segcompaction_threshold_segment_num / 2)
+ * length is beyond (config::segcompaction_batch_size / 2)
*/
Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
- SegCompactionCandidatesSharedPtr segments) {
- std::vector<segment_v2::SegmentSharedPtr> all_segments;
- // subtract one to skip last (maybe active) segment
- RETURN_IF_ERROR(_load_noncompacted_segments(&all_segments, _num_segment -
1));
-
- if (VLOG_DEBUG_IS_ON) {
- vlog_buffer.clear();
- for (auto& segment : all_segments) {
- fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(),
segment->num_rows());
- }
- VLOG_DEBUG << "all noncompacted segments num:" << all_segments.size()
- << " list of segments:" << fmt::to_string(vlog_buffer);
- }
-
- bool is_terminated_by_big = false;
- bool let_big_terminate = false;
- size_t small_threshold = config::segcompaction_small_threshold;
- for (int64_t i = 0; i < all_segments.size(); ++i) {
- segment_v2::SegmentSharedPtr seg = all_segments[i];
- if (seg->num_rows() > small_threshold) {
- if (let_big_terminate) {
- is_terminated_by_big = true;
- break;
- } else {
+ SegCompactionCandidatesSharedPtr& segments) {
+ segments = std::make_shared<SegCompactionCandidates>();
+ // skip last (maybe active) segment
+ int32_t last_segment = _num_segment - 1;
+ size_t task_bytes = 0;
+ uint32_t task_rows = 0;
+ int32_t segid;
+ for (segid = _segcompacted_point;
+ segid < last_segment && segments->size() <
config::segcompaction_batch_size; segid++) {
+ segment_v2::SegmentSharedPtr segment;
+ RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid));
+ const auto segment_rows = segment->num_rows();
+ const auto segment_bytes = segment->file_reader()->size();
+ bool is_large_segment = segment_rows >
config::segcompaction_candidate_max_rows ||
+ segment_bytes >
config::segcompaction_candidate_max_bytes;
+ if (is_large_segment) {
+ if (segid == _segcompacted_point) {
+ // skip large segments at the front
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+ continue;
+ } else {
+ // stop because we need consecutive segments
+ break;
}
- } else {
- let_big_terminate = true; // break if find a big after small
- segments->push_back(seg);
}
+ bool is_task_full = task_rows + segment_rows >
config::segcompaction_task_max_rows ||
+ task_bytes + segment_bytes >
config::segcompaction_task_max_bytes;
+ if (is_task_full) {
+ break;
+ }
+ segments->push_back(segment);
+ task_rows += segment->num_rows();
+ task_bytes += segment->file_reader()->size();
}
size_t s = segments->size();
- if (!is_terminated_by_big && s <=
(config::segcompaction_threshold_segment_num / 2)) {
- // start with big segments and end with small, better to do it in next
+ if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) {
+ // we didn't collect enough segments, better to do it in next
// round to compact more at once
segments->clear();
return Status::OK();
@@ -371,9 +370,8 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
if (_segcompaction_status.load() != OK) {
status = Status::Error<SEGCOMPACTION_FAILED>(
"BetaRowsetWriter::_segcompaction_if_necessary meet invalid
state");
- } else if ((_num_segment - _segcompacted_point) >=
- config::segcompaction_threshold_segment_num) {
- SegCompactionCandidatesSharedPtr segments =
std::make_shared<SegCompactionCandidates>();
+ } else if ((_num_segment - _segcompacted_point) >=
config::segcompaction_batch_size) {
+ SegCompactionCandidatesSharedPtr segments;
status = _find_longest_consecutive_small_segment(segments);
if (LIKELY(status.ok()) && (segments->size() > 0)) {
LOG(INFO) << "submit segcompaction task, tablet_id:" <<
_context.tablet_id
@@ -410,9 +408,7 @@ Status
BetaRowsetWriter::_segcompaction_rename_last_segments() {
// currently we only rename remaining segments to reduce wait time
// so that transaction can be committed ASAP
VLOG_DEBUG << "segcompaction last few segments";
- SegCompactionCandidates segments;
- RETURN_IF_ERROR(_load_noncompacted_segments(&segments, _num_segment));
- for (int i = 0; i < segments.size(); ++i) {
+ for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
}
return Status::OK();
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 8c635f3459..22bed4feff 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -162,9 +162,8 @@ private:
std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin,
int64_t end);
Status _segcompaction_if_necessary();
Status _segcompaction_rename_last_segments();
- Status
_load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
- size_t num);
- Status
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr
segments);
+ Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
int32_t segment_id);
+ Status
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr&
segments);
bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
bool _check_and_set_is_doing_segcompaction();
diff --git a/be/test/olap/segcompaction_test.cpp
b/be/test/olap/segcompaction_test.cpp
index 6a4476f423..2a894b9197 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -231,9 +231,9 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
RowsetSharedPtr rowset;
const int num_segments = 15;
const uint32_t rows_per_segment = 4096;
- config::segcompaction_small_threshold = 6000; // set threshold above
- // rows_per_segment
- config::segcompaction_threshold_segment_num = 10;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
+ config::segcompaction_batch_size = 10;
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
@@ -340,8 +340,8 @@ TEST_F(SegCompactionTest,
SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
create_tablet_schema(tablet_schema, DUP_KEYS);
RowsetSharedPtr rowset;
- config::segcompaction_small_threshold = 6000; // set threshold above
- // rows_per_segment
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
@@ -484,8 +484,8 @@ TEST_F(SegCompactionTest,
SegCompactionInterleaveWithBig_OoOoO) {
create_tablet_schema(tablet_schema, DUP_KEYS);
RowsetSharedPtr rowset;
- config::segcompaction_small_threshold = 6000; // set threshold above
- config::segcompaction_threshold_segment_num = 5;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ config::segcompaction_batch_size = 5;
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
@@ -607,9 +607,9 @@ TEST_F(SegCompactionTest,
SegCompactionThenReadUniqueTableSmall) {
create_tablet_schema(tablet_schema, UNIQUE_KEYS);
RowsetSharedPtr rowset;
- config::segcompaction_small_threshold = 6000; // set threshold above
- // rows_per_segment
- config::segcompaction_threshold_segment_num = 3;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
+ config::segcompaction_batch_size = 3;
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
@@ -841,9 +841,9 @@ TEST_F(SegCompactionTest,
SegCompactionThenReadAggTableSmall) {
create_tablet_schema(tablet_schema, AGG_KEYS);
RowsetSharedPtr rowset;
- config::segcompaction_small_threshold = 6000; // set threshold above
- // rows_per_segment
- config::segcompaction_threshold_segment_num = 3;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
+ config::segcompaction_batch_size = 3;
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
diff --git a/docs/en/docs/admin-manual/config/be-config.md
b/docs/en/docs/admin-manual/config/be-config.md
index bfc61c56e9..f558d40118 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -629,18 +629,42 @@ BaseCompaction:546859:
* Description: Enable to use segment compaction during loading to avoid -238
error
* Default value: true
-#### `segcompaction_threshold_segment_num`
+#### `segcompaction_batch_size`
* Type: int32
-* Description: Trigger segcompaction if the num of segments in a rowset
exceeds this threshold
+* Description: Max number of segments allowed in a single segcompaction task.
* Default value: 10
-#### `segcompaction_small_threshold`
+#### `segcompaction_candidate_max_rows`
* Type: int32
-* Description: The segment whose row number above the threshold will be
compacted during segcompaction
+* Description: Max row count allowed in a single source segment, bigger
segments will be skipped.
* Default value: 1048576
+#### `segcompaction_candidate_max_bytes`
+
+* Type: int64
+* Description: Max file size allowed in a single source segment, bigger
segments will be skipped.
+* Default value: 104857600
+
+#### `segcompaction_task_max_rows`
+
+* Type: int32
+* Description: Max total row count allowed in a single segcompaction task.
+* Default value: 1572864
+
+#### `segcompaction_task_max_bytes`
+
+* Type: int64
+* Description: Max total file size allowed in a single segcompaction task.
+* Default value: 157286400
+
+#### `segcompaction_num_threads`
+
+* Type: int32
+* Description: Global segcompaction thread pool size.
+* Default value: 5
+
#### `disable_compaction_trace_log`
* Type: bool
diff --git a/docs/en/docs/advanced/best-practice/compaction.md
b/docs/en/docs/advanced/best-practice/compaction.md
index 5963a1bdfb..5168b38dab 100644
--- a/docs/en/docs/advanced/best-practice/compaction.md
+++ b/docs/en/docs/advanced/best-practice/compaction.md
@@ -59,7 +59,7 @@ The following features are provided by segment compaction:
BE configuration:
- `enable_segcompaction=true` turn it on.
-- `segcompaction_threshold_segment_num` is used to configure the interval for
merging. The default value 10 means that every 10 segment files will trigger a
segment compaction. It is recommended to set between 10 - 30. The larger value
will increase the memory usage of segment compaction.
+- `segcompaction_batch_size` is used to configure the interval for merging.
The default value 10 means that every 10 segment files will trigger a segment
compaction. It is recommended to set between 10 - 30. The larger value will
increase the memory usage of segment compaction.
Situations where segment compaction is recommended:
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 7a3ac0315a..edb4e3e10d 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -643,18 +643,54 @@ BaseCompaction:546859:
* 描述:在导入时进行 segment compaction 来减少 segment 数量, 以避免出现写入时的 -238 错误
* 默认值:true
-#### `segcompaction_threshold_segment_num`
+#### `segcompaction_batch_size`
* 类型:int32
* 描述:当 segment 数量超过此阈值时触发 segment compaction
* 默认值:10
-#### `segcompaction_small_threshold`
+#### `segcompaction_candidate_max_rows`
* 类型:int32
* 描述:当 segment 的行数超过此大小时则会在 segment compaction 时被 compact,否则跳过
* 默认值:1048576
+#### `segcompaction_batch_size`
+
+* 类型: int32
+* 描述: 单个 segment compaction 任务中的最大原始 segment 数量。
+* 默认值: 10
+
+#### `segcompaction_candidate_max_rows`
+
+* 类型: int32
+* 描述: segment compaction 任务中允许的单个原始 segment 行数,过大的 segment 将被跳过。
+* 默认值: 1048576
+
+#### `segcompaction_candidate_max_bytes`
+
+* 类型: int64
+* 描述: segment compaction 任务中允许的单个原始 segment 大小(字节),过大的 segment 将被跳过。
+* 默认值: 104857600
+
+#### `segcompaction_task_max_rows`
+
+* 类型: int32
+* 描述: 单个 segment compaction 任务中允许的原始 segment 总行数。
+* 默认值: 1572864
+
+#### `segcompaction_task_max_bytes`
+
+* 类型: int64
+* 描述: 单个 segment compaction 任务中允许的原始 segment 总大小(字节)。
+* 默认值: 157286400
+
+#### `segcompaction_num_threads`
+
+* 类型: int32
+* 描述: segment compaction 线程池大小。
+* 默认值: 5
+
#### `disable_compaction_trace_log`
* 类型: bool
diff --git a/docs/zh-CN/docs/advanced/best-practice/compaction.md
b/docs/zh-CN/docs/advanced/best-practice/compaction.md
index 342397c740..5b8562db50 100644
--- a/docs/zh-CN/docs/advanced/best-practice/compaction.md
+++ b/docs/zh-CN/docs/advanced/best-practice/compaction.md
@@ -57,7 +57,7 @@ Segment compaction 有以下特点:
开启和配置方法(BE 配置):
- `enable_segcompaction = true` 可以使能该功能
-- `segcompaction_threshold_segment_num` 用于配置合并的间隔。默认 10 表示每生成 10 个 segment
文件将会进行一次 segment compaction。一般设置为 10 - 30,过大的值会增加 segment compaction 的内存用量。
+- `segcompaction_batch_size` 用于配置合并的间隔。默认 10 表示每生成 10 个 segment 文件将会进行一次
segment compaction。一般设置为 10 - 30,过大的值会增加 segment compaction 的内存用量。
如有以下场景或问题,建议开启此功能:
- 导入大量数据触发 OLAP_ERR_TOO_MANY_SEGMENTS (errcode -238) 错误导致导入失败。此时建议打开 segment
compaction 的功能,在导入过程中对 segment 进行合并控制最终的数量。
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]