This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 6e4fd993652 branch-4.0: [improvement](rowset) Aggregate non-MOW
segment key bounds (#64305)
6e4fd993652 is described below
commit 6e4fd99365214bf3f4525730e95979bb007f04ad
Author: Xin Liao <[email protected]>
AuthorDate: Wed Jun 24 18:20:54 2026 +0800
branch-4.0: [improvement](rowset) Aggregate non-MOW segment key bounds
(#64305)
Pick apache/doris#62604\n\nNote: keep
enable_aggregate_non_mow_key_bounds disabled by default for
upgrade/downgrade safety.
---
be/src/cloud/cloud_snapshot_mgr.cpp | 8 +
be/src/cloud/pb_convert.cpp | 28 +++-
be/src/common/config.cpp | 4 +
be/src/common/config.h | 4 +
be/src/olap/base_tablet.cpp | 13 +-
be/src/olap/compaction.cpp | 10 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 9 +-
be/src/olap/rowset/rowset.h | 4 +
be/src/olap/rowset/rowset_meta.cpp | 33 +++-
be/src/olap/rowset/rowset_meta.h | 16 +-
be/src/olap/snapshot_manager.cpp | 16 +-
be/src/olap/snapshot_manager.h | 2 +-
be/src/olap/task/index_builder.cpp | 5 +-
be/test/olap/rowset/rowset_meta_test.cpp | 152 ++++++++++++++++++
.../olap/segments_key_bounds_truncation_test.cpp | 7 +
gensrc/proto/olap_file.proto | 10 ++
.../test_non_mow_key_bounds_aggregation.groovy | 177 +++++++++++++++++++++
17 files changed, 478 insertions(+), 20 deletions(-)
diff --git a/be/src/cloud/cloud_snapshot_mgr.cpp
b/be/src/cloud/cloud_snapshot_mgr.cpp
index 7d49579ea01..b715b47ff9b 100644
--- a/be/src/cloud/cloud_snapshot_mgr.cpp
+++ b/be/src/cloud/cloud_snapshot_mgr.cpp
@@ -276,6 +276,14 @@ Status CloudSnapshotMgr::_create_rowset_meta(
for (const auto& key_bound : source_meta_pb.segments_key_bounds()) {
*new_rowset_meta_pb->add_segments_key_bounds() = key_bound;
}
+ if (source_meta_pb.has_segments_key_bounds_truncated()) {
+ new_rowset_meta_pb->set_segments_key_bounds_truncated(
+ source_meta_pb.segments_key_bounds_truncated());
+ }
+ if (source_meta_pb.has_segments_key_bounds_aggregated()) {
+ new_rowset_meta_pb->set_segments_key_bounds_aggregated(
+ source_meta_pb.segments_key_bounds_aggregated());
+ }
if (source_meta_pb.has_delete_predicate()) {
DeletePredicatePB* new_delete_condition =
new_rowset_meta_pb->mutable_delete_predicate();
*new_delete_condition = source_meta_pb.delete_predicate();
diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp
index 1f9d087ee45..114d09739f3 100644
--- a/be/src/cloud/pb_convert.cpp
+++ b/be/src/cloud/pb_convert.cpp
@@ -79,7 +79,12 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out,
const RowsetMetaPB& in)
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
- out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+ if (in.has_segments_key_bounds_truncated()) {
+
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+ }
+ if (in.has_segments_key_bounds_aggregated()) {
+
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
+ }
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
@@ -157,7 +162,12 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out,
RowsetMetaPB&& in) {
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
- out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+ if (in.has_segments_key_bounds_truncated()) {
+
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+ }
+ if (in.has_segments_key_bounds_aggregated()) {
+
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
+ }
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
@@ -247,7 +257,12 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const
RowsetMetaCloudPB& in)
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
- out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+ if (in.has_segments_key_bounds_truncated()) {
+
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+ }
+ if (in.has_segments_key_bounds_aggregated()) {
+
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
+ }
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
@@ -325,7 +340,12 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out,
RowsetMetaCloudPB&& in) {
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
- out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+ if (in.has_segments_key_bounds_truncated()) {
+
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+ }
+ if (in.has_segments_key_bounds_aggregated()) {
+
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
+ }
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 07171f75b5d..a67e77f5e41 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1624,6 +1624,10 @@ DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas,
"false");
DEFINE_mInt32(segments_key_bounds_truncation_threshold, "36");
// ATTENTION: for test only, use random segments key bounds truncation
threshold every time
DEFINE_mBool(random_segments_key_bounds_truncation, "false");
+
+// If true, non-MOW rowsets store a single aggregated [rowset_min, rowset_max]
+// key-bounds entry instead of per-segment bounds, to reduce meta size on
cloud FDB.
+DEFINE_mBool(enable_aggregate_non_mow_key_bounds, "false");
// p0, daily, rqg, external
DEFINE_String(fuzzy_test_type, "");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7652d49a911..3b4f487bde2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1716,6 +1716,10 @@ DECLARE_mInt32(segments_key_bounds_truncation_threshold);
// ATTENTION: for test only, use random segments key bounds truncation
threshold every time
DECLARE_mBool(random_segments_key_bounds_truncation);
+// If true, non-MOW rowsets store a single aggregated [rowset_min, rowset_max]
+// key-bounds entry instead of per-segment bounds, to reduce meta size on
cloud FDB.
+DECLARE_mBool(enable_aggregate_non_mow_key_bounds);
+
DECLARE_mBool(enable_auto_clone_on_compaction_missing_version);
DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version);
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 021708284be..4fb0c310343 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -465,7 +465,18 @@ Status BaseTablet::lookup_row_key(const Slice&
encoded_key, TabletSchema* latest
std::vector<KeyBoundsPB> segments_key_bounds;
rs->rowset_meta()->get_segments_key_bounds(&segments_key_bounds);
int num_segments = cast_set<int>(rs->num_segments());
- DCHECK_EQ(segments_key_bounds.size(), num_segments);
+ // MOW lookup requires per-segment bounds. Aggregation must be disabled
+ // for MOW writers, but enforce at runtime too — indexing
segments_key_bounds[j]
+ // below would be out-of-bounds otherwise.
+ if (UNLIKELY(rs->rowset_meta()->is_segments_key_bounds_aggregated() ||
+ static_cast<int>(segments_key_bounds.size()) !=
num_segments)) {
+ return Status::InternalError(
+ "MOW lookup got rowset with inconsistent
segments_key_bounds, rowset_id={}, "
+ "aggregated={}, bounds_size={}, num_segments={}",
+ rs->rowset_id().to_string(),
+ rs->rowset_meta()->is_segments_key_bounds_aggregated(),
+ segments_key_bounds.size(), num_segments);
+ }
std::vector<uint32_t> picked_segments;
for (int j = num_segments - 1; j >= 0; j--) {
if (key_is_not_in_segment(key_without_seq, segments_key_bounds[j],
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index c168818f3a9..4891ebbbb2b 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -418,6 +418,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
// link data to new rowset
auto seg_id = 0;
bool segments_key_bounds_truncated {false};
+ bool any_input_aggregated {false};
std::vector<KeyBoundsPB> segment_key_bounds;
std::vector<uint32_t> num_segment_rows;
for (auto rowset : _input_rowsets) {
@@ -425,6 +426,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
_output_rs_writer->rowset_id(),
seg_id));
seg_id += rowset->num_segments();
segments_key_bounds_truncated |=
rowset->is_segments_key_bounds_truncated();
+ any_input_aggregated |=
rowset->rowset_meta()->is_segments_key_bounds_aggregated();
std::vector<KeyBoundsPB> key_bounds;
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&key_bounds));
segment_key_bounds.insert(segment_key_bounds.end(),
key_bounds.begin(), key_bounds.end());
@@ -444,7 +446,13 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
rowset_meta->set_segments_overlap(NONOVERLAPPING);
rowset_meta->set_rowset_state(VISIBLE);
rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated);
- rowset_meta->set_segments_key_bounds(segment_key_bounds);
+ // If any input was already aggregated we have no way to recover
per-segment
+ // bounds, so force aggregation on the output to keep the layout consistent
+ // with `num_segments` / the aggregated flag, even if the config is off
now.
+ bool aggregate_key_bounds =
+ any_input_aggregated ||
(config::enable_aggregate_non_mow_key_bounds &&
+
!_tablet->enable_unique_key_merge_on_write());
+ rowset_meta->set_segments_key_bounds(segment_key_bounds,
aggregate_key_bounds);
rowset_meta->set_num_segment_rows(num_segment_rows);
_output_rowset = _output_rs_writer->manual_build(rowset_meta);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index bdb3dd9075e..a96c497cdaf 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -138,7 +138,10 @@ void build_rowset_meta_with_spec_field(RowsetMeta&
rowset_meta,
spec_rowset_meta.is_segments_key_bounds_truncated());
std::vector<KeyBoundsPB> segments_key_bounds;
spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
- rowset_meta.set_segments_key_bounds(segments_key_bounds);
+ // Preserve source layout: if source was aggregated (size 1),
re-aggregating
+ // the single entry is a no-op that also keeps the flag consistent.
+ rowset_meta.set_segments_key_bounds(segments_key_bounds,
+
spec_rowset_meta.is_segments_key_bounds_aggregated());
std::vector<uint32_t> num_segment_rows;
spec_rowset_meta.get_num_segment_rows(&num_segment_rows);
rowset_meta.set_num_segment_rows(num_segment_rows);
@@ -1074,7 +1077,9 @@ Status
BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
_total_index_size);
rowset_meta->set_data_disk_size(total_data_size + _total_data_size);
rowset_meta->set_index_disk_size(total_index_size + _total_index_size);
- rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds);
+ bool aggregate_key_bounds = config::enable_aggregate_non_mow_key_bounds &&
+ !_context.enable_unique_key_merge_on_write;
+ rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds,
aggregate_key_bounds);
// TODO write zonemap to meta
rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0);
rowset_meta->set_creation_time(time(nullptr));
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 2d2a6267ff8..d02727bd3cf 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -302,6 +302,10 @@ public:
return _rowset_meta->is_segments_key_bounds_truncated();
}
+ bool is_segments_key_bounds_aggregated() const {
+ return _rowset_meta->is_segments_key_bounds_aggregated();
+ }
+
bool check_rowset_segment();
[[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
diff --git a/be/src/olap/rowset/rowset_meta.cpp
b/be/src/olap/rowset/rowset_meta.cpp
index fd3647ca8e7..7744a382d75 100644
--- a/be/src/olap/rowset/rowset_meta.cpp
+++ b/be/src/olap/rowset/rowset_meta.cpp
@@ -291,11 +291,31 @@ int64_t RowsetMeta::segment_file_size(int seg_id) const {
: -1;
}
-void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>&
segments_key_bounds) {
- for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
- KeyBoundsPB* new_key_bounds =
_rowset_meta_pb.add_segments_key_bounds();
- *new_key_bounds = key_bounds;
+void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>&
segments_key_bounds,
+ bool aggregate_into_single) {
+ _rowset_meta_pb.clear_segments_key_bounds();
+ bool did_aggregate = aggregate_into_single && !segments_key_bounds.empty();
+ if (did_aggregate) {
+ const std::string* overall_min =
&segments_key_bounds.front().min_key();
+ const std::string* overall_max =
&segments_key_bounds.front().max_key();
+ for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
+ if (key_bounds.min_key() < *overall_min) {
+ overall_min = &key_bounds.min_key();
+ }
+ if (key_bounds.max_key() > *overall_max) {
+ overall_max = &key_bounds.max_key();
+ }
+ }
+ KeyBoundsPB* aggregated = _rowset_meta_pb.add_segments_key_bounds();
+ aggregated->set_min_key(*overall_min);
+ aggregated->set_max_key(*overall_max);
+ } else {
+ for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
+ KeyBoundsPB* new_key_bounds =
_rowset_meta_pb.add_segments_key_bounds();
+ *new_key_bounds = key_bounds;
+ }
}
+ set_segments_key_bounds_aggregated(did_aggregate);
int32_t truncation_threshold =
config::segments_key_bounds_truncation_threshold;
if (config::random_segments_key_bounds_truncation) {
@@ -328,6 +348,11 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta&
other) {
set_total_disk_size(data_disk_size() + index_disk_size());
set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
other.is_segments_key_bounds_truncated());
+ // merge_rowset_meta is used in the MOW partial-update publish path, which
relies
+ // on per-segment bounds. Aggregation should never be enabled for MOW
rowsets,
+ // so we do not expect either side to be aggregated here.
+ DCHECK(!is_segments_key_bounds_aggregated() &&
!other.is_segments_key_bounds_aggregated())
+ << "merge_rowset_meta encountered aggregated key bounds";
if (_rowset_meta_pb.num_segment_rows_size() > 0) {
if (other.num_segments() > 0) {
if (other._rowset_meta_pb.num_segment_rows_size() > 0) {
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 9f052cf5e10..fcbd1f1f701 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -355,6 +355,17 @@ public:
_rowset_meta_pb.set_segments_key_bounds_truncated(truncated);
}
+ // When true, `segments_key_bounds` holds a single aggregated
+ // [rowset_min, rowset_max] entry instead of per-segment bounds.
+ bool is_segments_key_bounds_aggregated() const {
+ return _rowset_meta_pb.has_segments_key_bounds_aggregated() &&
+ _rowset_meta_pb.segments_key_bounds_aggregated();
+ }
+
+ void set_segments_key_bounds_aggregated(bool aggregated) {
+ _rowset_meta_pb.set_segments_key_bounds_aggregated(aggregated);
+ }
+
bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
// for compatibility, old version has not segment key bounds
if (_rowset_meta_pb.segments_key_bounds_size() == 0) {
@@ -372,7 +383,10 @@ public:
return true;
}
- void set_segments_key_bounds(const std::vector<KeyBoundsPB>&
segments_key_bounds);
+ // If `aggregate_into_single` is true, collapse per-segment bounds into a
single
+ // [rowset_min, rowset_max] entry and mark this rowset as aggregated.
+ void set_segments_key_bounds(const std::vector<KeyBoundsPB>&
segments_key_bounds,
+ bool aggregate_into_single = false);
void add_segment_key_bounds(KeyBoundsPB segments_key_bounds) {
*_rowset_meta_pb.add_segments_key_bounds() =
std::move(segments_key_bounds);
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index b28aa478356..450a0701822 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -227,8 +227,9 @@ Result<std::vector<PendingRowsetGuard>>
SnapshotManager::convert_rowset_ids(
// src be local rowset
RowsetId rowset_id = _engine.next_rowset_id();
guards.push_back(_engine.pending_local_rowsets().add(rowset_id));
- RETURN_IF_ERROR_RESULT(_rename_rowset_id(visible_rowset,
clone_dir, tablet_schema,
- rowset_id, rowset_meta));
+ RETURN_IF_ERROR_RESULT(_rename_rowset_id(
+ visible_rowset, clone_dir, tablet_schema, rowset_id,
rowset_meta,
+ new_tablet_meta_pb.enable_unique_key_merge_on_write()));
RowsetId src_rs_id;
if (visible_rowset.rowset_id() > 0) {
src_rs_id.init(visible_rowset.rowset_id());
@@ -269,8 +270,9 @@ Result<std::vector<PendingRowsetGuard>>
SnapshotManager::convert_rowset_ids(
// src be local rowset
RowsetId rowset_id = _engine.next_rowset_id();
guards.push_back(_engine.pending_local_rowsets().add(rowset_id));
- RETURN_IF_ERROR_RESULT(_rename_rowset_id(stale_rowset, clone_dir,
tablet_schema,
- rowset_id, rowset_meta));
+ RETURN_IF_ERROR_RESULT(_rename_rowset_id(
+ stale_rowset, clone_dir, tablet_schema, rowset_id,
rowset_meta,
+ new_tablet_meta_pb.enable_unique_key_merge_on_write()));
RowsetId src_rs_id;
if (stale_rowset.rowset_id() > 0) {
src_rs_id.init(stale_rowset.rowset_id());
@@ -324,7 +326,8 @@ Result<std::vector<PendingRowsetGuard>>
SnapshotManager::convert_rowset_ids(
Status SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
const std::string& new_tablet_path,
TabletSchemaSPtr tablet_schema,
const RowsetId& rowset_id,
- RowsetMetaPB* new_rs_meta_pb) {
+ RowsetMetaPB* new_rs_meta_pb,
+ bool
enable_unique_key_merge_on_write) {
Status st = Status::OK();
RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
rowset_meta->init_from_pb(rs_meta_pb);
@@ -350,6 +353,9 @@ Status SnapshotManager::_rename_rowset_id(const
RowsetMetaPB& rs_meta_pb,
context.newest_write_timestamp = org_rowset_meta->newest_write_timestamp();
// keep segments_overlap same as origin rowset
context.segments_overlap = rowset_meta->segments_overlap();
+ // propagate MOW flag so that non-MOW key-bounds aggregation is not applied
+ // when restoring a MOW tablet's rowset
+ context.enable_unique_key_merge_on_write =
enable_unique_key_merge_on_write;
auto rs_writer = DORIS_TRY(RowsetFactory::create_rowset_writer(_engine,
context, false));
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index 01303dac4f2..f63c62215d5 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -132,7 +132,7 @@ private:
Status _rename_rowset_id(const RowsetMetaPB& rs_meta_pb, const
std::string& new_tablet_path,
TabletSchemaSPtr tablet_schema, const RowsetId&
next_id,
- RowsetMetaPB* new_rs_meta_pb);
+ RowsetMetaPB* new_rs_meta_pb, bool
enable_unique_key_merge_on_write);
Status _rename_index_ids(TabletSchemaPB& schema_pb,
const TabletSchemaSPtr& tablet_schema) const;
diff --git a/be/src/olap/task/index_builder.cpp
b/be/src/olap/task/index_builder.cpp
index 1dc0c5e7565..2744f953ec9 100644
--- a/be/src/olap/task/index_builder.cpp
+++ b/be/src/olap/task/index_builder.cpp
@@ -307,7 +307,10 @@ Status IndexBuilder::update_inverted_index_info() {
RETURN_IF_ERROR(input_rowset->get_segments_key_bounds(&key_bounds));
rowset_meta->set_segments_key_bounds_truncated(
input_rowset_meta->is_segments_key_bounds_truncated());
- rowset_meta->set_segments_key_bounds(key_bounds);
+ // preserve aggregated layout via the setter so the aggregated flag is
not
+ // clobbered by set_segments_key_bounds's default reset path.
+ rowset_meta->set_segments_key_bounds(
+ key_bounds,
input_rowset_meta->is_segments_key_bounds_aggregated());
std::vector<uint32_t> num_segment_rows;
input_rowset_meta->get_num_segment_rows(&num_segment_rows);
rowset_meta->set_num_segment_rows(num_segment_rows);
diff --git a/be/test/olap/rowset/rowset_meta_test.cpp
b/be/test/olap/rowset/rowset_meta_test.cpp
index c78b5803f03..768ff00169e 100644
--- a/be/test/olap/rowset/rowset_meta_test.cpp
+++ b/be/test/olap/rowset/rowset_meta_test.cpp
@@ -303,4 +303,156 @@ TEST_F(RowsetMetaTest, TestMergeRowsetMetaBothEmpty) {
EXPECT_EQ(rowset_meta_1.num_segments(), 5);
}
+TEST_F(RowsetMetaTest, TestSegmentsKeyBoundsAggregation) {
+ auto make_bounds = [](std::string min_key, std::string max_key) {
+ KeyBoundsPB kb;
+ kb.set_min_key(std::move(min_key));
+ kb.set_max_key(std::move(max_key));
+ return kb;
+ };
+
+ // Prepare three per-segment bounds whose overall min is "a01" and overall
max is "z99".
+ // Intentionally unordered so that the aggregation must scan all entries.
+ std::vector<KeyBoundsPB> per_segment;
+ per_segment.push_back(make_bounds("m50", "z99"));
+ per_segment.push_back(make_bounds("a01", "k10"));
+ per_segment.push_back(make_bounds("f20", "r80"));
+
+ // Save and restore truncation config to keep the test deterministic.
+ int32_t saved_truncation =
config::segments_key_bounds_truncation_threshold;
+ config::segments_key_bounds_truncation_threshold = -1;
+ auto restore = std::shared_ptr<void>(nullptr, [&](void*) {
+ config::segments_key_bounds_truncation_threshold = saved_truncation;
+ });
+
+ // 1. aggregate=true -> single [overall_min, overall_max] entry, flag set.
+ {
+ RowsetMeta rs_meta;
+ rs_meta.set_num_segments(per_segment.size());
+ rs_meta.set_segments_key_bounds(per_segment,
/*aggregate_into_single=*/true);
+
+ std::vector<KeyBoundsPB> out;
+ rs_meta.get_segments_key_bounds(&out);
+ ASSERT_EQ(out.size(), 1);
+ EXPECT_EQ(out[0].min_key(), "a01");
+ EXPECT_EQ(out[0].max_key(), "z99");
+ EXPECT_TRUE(rs_meta.is_segments_key_bounds_aggregated());
+
+ // first_key/last_key must still return the global min/max.
+ KeyBoundsPB first;
+ KeyBoundsPB last;
+ ASSERT_TRUE(rs_meta.get_first_segment_key_bound(&first));
+ ASSERT_TRUE(rs_meta.get_last_segment_key_bound(&last));
+ EXPECT_EQ(first.min_key(), "a01");
+ EXPECT_EQ(last.max_key(), "z99");
+ }
+
+ // 2. aggregate=false (default) -> per-segment entries preserved, flag
unset.
+ {
+ RowsetMeta rs_meta;
+ rs_meta.set_num_segments(per_segment.size());
+ rs_meta.set_segments_key_bounds(per_segment);
+
+ std::vector<KeyBoundsPB> out;
+ rs_meta.get_segments_key_bounds(&out);
+ ASSERT_EQ(out.size(), per_segment.size());
+ EXPECT_FALSE(rs_meta.is_segments_key_bounds_aggregated());
+ for (size_t i = 0; i < per_segment.size(); ++i) {
+ EXPECT_EQ(out[i].min_key(), per_segment[i].min_key());
+ EXPECT_EQ(out[i].max_key(), per_segment[i].max_key());
+ }
+ }
+
+ // 3. aggregate=true with empty input -> nothing written, flag untouched.
+ {
+ RowsetMeta rs_meta;
+ rs_meta.set_segments_key_bounds({}, /*aggregate_into_single=*/true);
+
+ std::vector<KeyBoundsPB> out;
+ rs_meta.get_segments_key_bounds(&out);
+ EXPECT_EQ(out.size(), 0);
+ EXPECT_FALSE(rs_meta.is_segments_key_bounds_aggregated());
+ }
+
+ // 4. aggregate=true called twice -> result reflects the latest call only.
+ {
+ RowsetMeta rs_meta;
+ rs_meta.set_segments_key_bounds(per_segment,
/*aggregate_into_single=*/true);
+ std::vector<KeyBoundsPB> second;
+ second.push_back(make_bounds("b00", "c00"));
+ rs_meta.set_segments_key_bounds(second,
/*aggregate_into_single=*/true);
+
+ std::vector<KeyBoundsPB> out;
+ rs_meta.get_segments_key_bounds(&out);
+ ASSERT_EQ(out.size(), 1);
+ EXPECT_EQ(out[0].min_key(), "b00");
+ EXPECT_EQ(out[0].max_key(), "c00");
+ EXPECT_TRUE(rs_meta.is_segments_key_bounds_aggregated());
+ }
+
+ // 5. aggregated flag must be reset when switching from aggregate=true to
+ // aggregate=false on the same instance.
+ {
+ RowsetMeta rs_meta;
+ rs_meta.set_segments_key_bounds(per_segment,
/*aggregate_into_single=*/true);
+ ASSERT_TRUE(rs_meta.is_segments_key_bounds_aggregated());
+
+ rs_meta.set_segments_key_bounds(per_segment,
/*aggregate_into_single=*/false);
+ EXPECT_FALSE(rs_meta.is_segments_key_bounds_aggregated());
+
+ std::vector<KeyBoundsPB> out;
+ rs_meta.get_segments_key_bounds(&out);
+ EXPECT_EQ(out.size(), per_segment.size());
+ }
+
+ // 6. aggregated flag must be reset when calling with aggregate=true but an
+ // empty input after a prior aggregated call.
+ {
+ RowsetMeta rs_meta;
+ rs_meta.set_segments_key_bounds(per_segment,
/*aggregate_into_single=*/true);
+ ASSERT_TRUE(rs_meta.is_segments_key_bounds_aggregated());
+
+ rs_meta.set_segments_key_bounds({}, /*aggregate_into_single=*/true);
+ EXPECT_FALSE(rs_meta.is_segments_key_bounds_aggregated());
+
+ std::vector<KeyBoundsPB> out;
+ rs_meta.get_segments_key_bounds(&out);
+ EXPECT_TRUE(out.empty());
+ }
+}
+
+TEST_F(RowsetMetaTest, TestSegmentsKeyBoundsAggregationTruncation) {
+ // Aggregated entry is still subject to truncation.
+ int32_t saved_truncation =
config::segments_key_bounds_truncation_threshold;
+ bool saved_random = config::random_segments_key_bounds_truncation;
+ config::segments_key_bounds_truncation_threshold = 4;
+ config::random_segments_key_bounds_truncation = false;
+ auto restore = std::shared_ptr<void>(nullptr, [&](void*) {
+ config::segments_key_bounds_truncation_threshold = saved_truncation;
+ config::random_segments_key_bounds_truncation = saved_random;
+ });
+
+ auto make_bounds = [](std::string min_key, std::string max_key) {
+ KeyBoundsPB kb;
+ kb.set_min_key(std::move(min_key));
+ kb.set_max_key(std::move(max_key));
+ return kb;
+ };
+
+ std::vector<KeyBoundsPB> per_segment;
+ per_segment.push_back(make_bounds("aaaaaaa", "bbbbbbb"));
+ per_segment.push_back(make_bounds("ccccccc", "ddddddd"));
+
+ RowsetMeta rs_meta;
+ rs_meta.set_segments_key_bounds(per_segment,
/*aggregate_into_single=*/true);
+
+ std::vector<KeyBoundsPB> out;
+ rs_meta.get_segments_key_bounds(&out);
+ ASSERT_EQ(out.size(), 1);
+ EXPECT_EQ(out[0].min_key(), std::string("aaaa"));
+ EXPECT_EQ(out[0].max_key(), std::string("dddd"));
+ EXPECT_TRUE(rs_meta.is_segments_key_bounds_aggregated());
+ EXPECT_TRUE(rs_meta.is_segments_key_bounds_truncated());
+}
+
} // namespace doris
diff --git a/be/test/olap/segments_key_bounds_truncation_test.cpp
b/be/test/olap/segments_key_bounds_truncation_test.cpp
index faca751270b..e87212ecfa8 100644
--- a/be/test/olap/segments_key_bounds_truncation_test.cpp
+++ b/be/test/olap/segments_key_bounds_truncation_test.cpp
@@ -48,6 +48,7 @@ private:
std::string absolute_dir;
std::unique_ptr<DataDir> data_dir;
int cur_version {2};
+ bool _saved_aggregate_non_mow_key_bounds {false};
public:
void SetUp() override {
@@ -61,9 +62,15 @@ public:
data_dir = std::make_unique<DataDir>(*engine_ref, kSegmentDir);
ASSERT_TRUE(data_dir->update_capacity().ok());
ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
+
+ // This suite asserts per-segment key bounds; force the non-MOW
aggregation
+ // feature off so it is isolated from that orthogonal code path.
+ _saved_aggregate_non_mow_key_bounds =
config::enable_aggregate_non_mow_key_bounds;
+ config::enable_aggregate_non_mow_key_bounds = false;
}
void TearDown() override {
+ config::enable_aggregate_non_mow_key_bounds =
_saved_aggregate_non_mow_key_bounds;
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(kSegmentDir).ok());
engine_ref = nullptr;
ExecEnv::GetInstance()->set_storage_engine(nullptr);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 073185a25f6..390f50e583c 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -140,6 +140,11 @@ message RowsetMetaPB {
// rows count for each segment
repeated int64 num_segment_rows = 56;
+ // If true, `segments_key_bounds` contains a single aggregated
+ // [rowset_min_key, rowset_max_key] entry instead of per-segment bounds.
+ // Only applies to non-MOW rowsets to reduce meta size on cloud FDB.
+ optional bool segments_key_bounds_aggregated = 57;
+
// For cloud
// for data recycling
optional int64 txn_expiration = 1000;
@@ -244,6 +249,11 @@ message RowsetMetaCloudPB {
// rows count for each segment
repeated int64 num_segment_rows = 56;
+ // If true, `segments_key_bounds` contains a single aggregated
+ // [rowset_min_key, rowset_max_key] entry instead of per-segment bounds.
+ // Only applies to non-MOW rowsets to reduce meta size on cloud FDB.
+ optional bool segments_key_bounds_aggregated = 57;
+
// cloud
// the field is a vector, rename it
repeated int64 segments_file_size = 100;
diff --git
a/regression-test/suites/data_model_p0/duplicate/test_non_mow_key_bounds_aggregation.groovy
b/regression-test/suites/data_model_p0/duplicate/test_non_mow_key_bounds_aggregation.groovy
new file mode 100644
index 00000000000..5cacee3e4e6
--- /dev/null
+++
b/regression-test/suites/data_model_p0/duplicate/test_non_mow_key_bounds_aggregation.groovy
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.Http
+
+// Verifies that when `enable_aggregate_non_mow_key_bounds` is on, non-MOW
+// rowsets collapse per-segment key bounds into a single [min, max] entry
+// while MOW rowsets keep per-segment bounds regardless of the config.
+suite("test_non_mow_key_bounds_aggregation", "nonConcurrent") {
+
+ // see be/src/util/key_util.h:50
+ def keyNormalMarker = new String(new Byte[]{2})
+
+ def fetchRowsetMetaAtVersion = { String table, int version ->
+ def metaUrl = sql_return_maparray("show tablets from
${table};").get(0).MetaUrl
+ def jsonMeta = Http.GET(metaUrl, true, false)
+ for (def meta : jsonMeta.rs_metas) {
+ if (meta.end_version as int == version) {
+ return meta
+ }
+ }
+ if (isCloudMode()) {
+ for (int i = 0; i < 60; i++) {
+ Thread.sleep(1000)
+ jsonMeta = Http.GET(metaUrl, true, false)
+ for (def meta : jsonMeta.rs_metas) {
+ if (meta.end_version as int == version) {
+ return meta
+ }
+ }
+ }
+ }
+ return null
+ }
+
+ def dupTable = "test_non_mow_key_bounds_aggregation_dup"
+ def mowTable = "test_non_mow_key_bounds_aggregation_mow"
+
+ sql """ DROP TABLE IF EXISTS ${dupTable} force; """
+ sql """ CREATE TABLE ${dupTable} (
+ `k` varchar(65533) NOT NULL,
+ `v` int)
+ DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES("replication_num" = "1",
+ "disable_auto_compaction" = "true"); """
+
+ sql """ DROP TABLE IF EXISTS ${mowTable} force; """
+ sql """ CREATE TABLE ${mowTable} (
+ `k` varchar(65533) NOT NULL,
+ `v` int)
+ UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES("replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true"); """
+
+ int dupVersion = 1
+ int mowVersion = 1
+
+ // Disable truncation so bounds content equals the raw keys (with marker
prefix).
+ // Enable aggregation to exercise the new code path for non-MOW rowsets.
+ def configOn = [
+ segments_key_bounds_truncation_threshold: -1,
+ enable_aggregate_non_mow_key_bounds : true,
+ ]
+
+ setBeConfigTemporary(configOn) {
+ // Case 1: non-MOW with config on -> aggregated single [min, max]
entry.
+ String dupK1 = "aaaaaaaa"
+ String dupK2 = "mmmmmmmm"
+ String dupK3 = "zzzzzzzz"
+ sql """insert into ${dupTable} values("${dupK1}", 1), ("${dupK3}", 3),
("${dupK2}", 2);"""
+ def dupMeta = fetchRowsetMetaAtVersion(dupTable, ++dupVersion)
+ assertNotNull(dupMeta)
+ logger.info("dup rowset meta (config on): ${dupMeta}")
+ assertTrue(dupMeta.segments_key_bounds_aggregated as boolean)
+ assertEquals(1, dupMeta.segments_key_bounds.size())
+ assertEquals(keyNormalMarker + dupK1,
dupMeta.segments_key_bounds.get(0).min_key)
+ assertEquals(keyNormalMarker + dupK3,
dupMeta.segments_key_bounds.get(0).max_key)
+
+ // Case 2: MOW with config on -> per-segment bounds preserved, flag
unset.
+ String mowK1 = "mmmm1111"
+ String mowK2 = "mmmm2222"
+ sql """insert into ${mowTable} values("${mowK1}", 1), ("${mowK2}",
2);"""
+ def mowMeta = fetchRowsetMetaAtVersion(mowTable, ++mowVersion)
+ assertNotNull(mowMeta)
+ logger.info("mow rowset meta (config on): ${mowMeta}")
+ assertFalse((mowMeta.segments_key_bounds_aggregated ?: false) as
boolean)
+ assertEquals(mowMeta.num_segments as int,
mowMeta.segments_key_bounds.size())
+ }
+
+ // Case 3: non-MOW with config off -> per-segment bounds (size ==
num_segments), flag unset.
+ def configOff = [
+ segments_key_bounds_truncation_threshold: -1,
+ enable_aggregate_non_mow_key_bounds : false,
+ ]
+ setBeConfigTemporary(configOff) {
+ String dupK1 = "bbbb0000"
+ String dupK2 = "bbbb9999"
+ sql """insert into ${dupTable} values("${dupK1}", 10), ("${dupK2}",
20);"""
+ def dupMeta = fetchRowsetMetaAtVersion(dupTable, ++dupVersion)
+ assertNotNull(dupMeta)
+ logger.info("dup rowset meta (config off): ${dupMeta}")
+ assertFalse((dupMeta.segments_key_bounds_aggregated ?: false) as
boolean)
+ assertEquals(dupMeta.num_segments as int,
dupMeta.segments_key_bounds.size())
+ }
+
+ // Case 4: rebuilding an already-aggregated non-MOW rowset through the
index rewrite
+ // path must preserve the aggregated flag and single-entry layout.
Regression guard for
+ // IndexBuilder clobbering the flag via set_segments_key_bounds's default
parameter.
+ // IndexBuilder runs on local rowsets only, so skip this case in cloud
mode.
+ if (!isCloudMode()) {
+ def idxTable = "test_non_mow_key_bounds_aggregation_idx"
+ sql """ DROP TABLE IF EXISTS ${idxTable} force; """
+ sql """ CREATE TABLE ${idxTable} (
+ `k` varchar(65533) NOT NULL,
+ `v` int,
+ INDEX idx_v (v) USING INVERTED)
+ DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES("replication_num" = "1",
+ "disable_auto_compaction" = "true",
+ "light_schema_change" = "true"); """
+
+ def waitAlterFinish = { String tbl, int timeoutSec ->
+ for (int i = 0; i < timeoutSec; i++) {
+ def rs = sql """SHOW ALTER TABLE COLUMN WHERE TableName =
"${tbl}" ORDER BY CreateTime DESC LIMIT 1;"""
+ if (rs.size() == 0 || rs[0][9] == "FINISHED") {
+ return
+ }
+ if (rs[0][9] == "CANCELLED") {
+ throw new IllegalStateException("alter cancelled: ${rs}")
+ }
+ Thread.sleep(1000)
+ }
+ throw new IllegalStateException("waitAlterFinish timeout")
+ }
+
+ setBeConfigTemporary(configOn) {
+ String idxK1 = "idx00000"
+ String idxK2 = "idxzzzzz"
+ sql """insert into ${idxTable} values("${idxK1}", 1), ("${idxK2}",
2);"""
+ int idxVersion = 2
+ def beforeMeta = fetchRowsetMetaAtVersion(idxTable, idxVersion)
+ assertNotNull(beforeMeta)
+ logger.info("idx rowset meta before DROP INDEX: ${beforeMeta}")
+ assertTrue(beforeMeta.segments_key_bounds_aggregated as boolean)
+ assertEquals(1, beforeMeta.segments_key_bounds.size())
+
+ // DROP INDEX drives IndexBuilder, which rebuilds rowset meta in
place.
+ sql """ALTER TABLE ${idxTable} DROP INDEX idx_v;"""
+ waitAlterFinish(idxTable, 120)
+
+ def afterMeta = fetchRowsetMetaAtVersion(idxTable, idxVersion)
+ assertNotNull(afterMeta)
+ logger.info("idx rowset meta after DROP INDEX: ${afterMeta}")
+ assertTrue(afterMeta.segments_key_bounds_aggregated as boolean)
+ assertEquals(1, afterMeta.segments_key_bounds.size())
+ assertEquals(beforeMeta.segments_key_bounds.get(0).min_key,
+ afterMeta.segments_key_bounds.get(0).min_key)
+ assertEquals(beforeMeta.segments_key_bounds.get(0).max_key,
+ afterMeta.segments_key_bounds.get(0).max_key)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]