This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 6e4fd993652 branch-4.0: [improvement](rowset) Aggregate non-MOW 
segment key bounds (#64305)
6e4fd993652 is described below

commit 6e4fd99365214bf3f4525730e95979bb007f04ad
Author: Xin Liao <[email protected]>
AuthorDate: Wed Jun 24 18:20:54 2026 +0800

    branch-4.0: [improvement](rowset) Aggregate non-MOW segment key bounds 
(#64305)
    
    Pick apache/doris#62604\n\nNote: keep
    enable_aggregate_non_mow_key_bounds disabled by default for
    upgrade/downgrade safety.
---
 be/src/cloud/cloud_snapshot_mgr.cpp                |   8 +
 be/src/cloud/pb_convert.cpp                        |  28 +++-
 be/src/common/config.cpp                           |   4 +
 be/src/common/config.h                             |   4 +
 be/src/olap/base_tablet.cpp                        |  13 +-
 be/src/olap/compaction.cpp                         |  10 +-
 be/src/olap/rowset/beta_rowset_writer.cpp          |   9 +-
 be/src/olap/rowset/rowset.h                        |   4 +
 be/src/olap/rowset/rowset_meta.cpp                 |  33 +++-
 be/src/olap/rowset/rowset_meta.h                   |  16 +-
 be/src/olap/snapshot_manager.cpp                   |  16 +-
 be/src/olap/snapshot_manager.h                     |   2 +-
 be/src/olap/task/index_builder.cpp                 |   5 +-
 be/test/olap/rowset/rowset_meta_test.cpp           | 152 ++++++++++++++++++
 .../olap/segments_key_bounds_truncation_test.cpp   |   7 +
 gensrc/proto/olap_file.proto                       |  10 ++
 .../test_non_mow_key_bounds_aggregation.groovy     | 177 +++++++++++++++++++++
 17 files changed, 478 insertions(+), 20 deletions(-)

diff --git a/be/src/cloud/cloud_snapshot_mgr.cpp 
b/be/src/cloud/cloud_snapshot_mgr.cpp
index 7d49579ea01..b715b47ff9b 100644
--- a/be/src/cloud/cloud_snapshot_mgr.cpp
+++ b/be/src/cloud/cloud_snapshot_mgr.cpp
@@ -276,6 +276,14 @@ Status CloudSnapshotMgr::_create_rowset_meta(
     for (const auto& key_bound : source_meta_pb.segments_key_bounds()) {
         *new_rowset_meta_pb->add_segments_key_bounds() = key_bound;
     }
+    if (source_meta_pb.has_segments_key_bounds_truncated()) {
+        new_rowset_meta_pb->set_segments_key_bounds_truncated(
+                source_meta_pb.segments_key_bounds_truncated());
+    }
+    if (source_meta_pb.has_segments_key_bounds_aggregated()) {
+        new_rowset_meta_pb->set_segments_key_bounds_aggregated(
+                source_meta_pb.segments_key_bounds_aggregated());
+    }
     if (source_meta_pb.has_delete_predicate()) {
         DeletePredicatePB* new_delete_condition = 
new_rowset_meta_pb->mutable_delete_predicate();
         *new_delete_condition = source_meta_pb.delete_predicate();
diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp
index 1f9d087ee45..114d09739f3 100644
--- a/be/src/cloud/pb_convert.cpp
+++ b/be/src/cloud/pb_convert.cpp
@@ -79,7 +79,12 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, 
const RowsetMetaPB& in)
     }
     out->set_txn_expiration(in.txn_expiration());
     out->set_segments_overlap_pb(in.segments_overlap_pb());
-    out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+    if (in.has_segments_key_bounds_truncated()) {
+        
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+    }
+    if (in.has_segments_key_bounds_aggregated()) {
+        
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
+    }
     out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
     out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
     out->set_index_id(in.index_id());
@@ -157,7 +162,12 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, 
RowsetMetaPB&& in) {
     }
     out->set_txn_expiration(in.txn_expiration());
     out->set_segments_overlap_pb(in.segments_overlap_pb());
-    out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+    if (in.has_segments_key_bounds_truncated()) {
+        
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+    }
+    if (in.has_segments_key_bounds_aggregated()) {
+        
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
+    }
     out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
     out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
     out->set_index_id(in.index_id());
@@ -247,7 +257,12 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const 
RowsetMetaCloudPB& in)
     }
     out->set_txn_expiration(in.txn_expiration());
     out->set_segments_overlap_pb(in.segments_overlap_pb());
-    out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+    if (in.has_segments_key_bounds_truncated()) {
+        
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+    }
+    if (in.has_segments_key_bounds_aggregated()) {
+        
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
+    }
     out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
     out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
     out->set_index_id(in.index_id());
@@ -325,7 +340,12 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, 
RowsetMetaCloudPB&& in) {
     }
     out->set_txn_expiration(in.txn_expiration());
     out->set_segments_overlap_pb(in.segments_overlap_pb());
-    out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+    if (in.has_segments_key_bounds_truncated()) {
+        
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
+    }
+    if (in.has_segments_key_bounds_aggregated()) {
+        
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
+    }
     out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
     out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
     out->set_index_id(in.index_id());
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 07171f75b5d..a67e77f5e41 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1624,6 +1624,10 @@ DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, 
"false");
 DEFINE_mInt32(segments_key_bounds_truncation_threshold, "36");
 // ATTENTION: for test only, use random segments key bounds truncation 
threshold every time
 DEFINE_mBool(random_segments_key_bounds_truncation, "false");
+
+// If true, non-MOW rowsets store a single aggregated [rowset_min, rowset_max]
+// key-bounds entry instead of per-segment bounds, to reduce meta size on 
cloud FDB.
+DEFINE_mBool(enable_aggregate_non_mow_key_bounds, "false");
 // p0, daily, rqg, external
 DEFINE_String(fuzzy_test_type, "");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7652d49a911..3b4f487bde2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1716,6 +1716,10 @@ DECLARE_mInt32(segments_key_bounds_truncation_threshold);
 // ATTENTION: for test only, use random segments key bounds truncation 
threshold every time
 DECLARE_mBool(random_segments_key_bounds_truncation);
 
+// If true, non-MOW rowsets store a single aggregated [rowset_min, rowset_max]
+// key-bounds entry instead of per-segment bounds, to reduce meta size on 
cloud FDB.
+DECLARE_mBool(enable_aggregate_non_mow_key_bounds);
+
 DECLARE_mBool(enable_auto_clone_on_compaction_missing_version);
 
 DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version);
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 021708284be..4fb0c310343 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -465,7 +465,18 @@ Status BaseTablet::lookup_row_key(const Slice& 
encoded_key, TabletSchema* latest
         std::vector<KeyBoundsPB> segments_key_bounds;
         rs->rowset_meta()->get_segments_key_bounds(&segments_key_bounds);
         int num_segments = cast_set<int>(rs->num_segments());
-        DCHECK_EQ(segments_key_bounds.size(), num_segments);
+        // MOW lookup requires per-segment bounds. Aggregation must be disabled
+        // for MOW writers, but enforce at runtime too — indexing 
segments_key_bounds[j]
+        // below would be out-of-bounds otherwise.
+        if (UNLIKELY(rs->rowset_meta()->is_segments_key_bounds_aggregated() ||
+                     static_cast<int>(segments_key_bounds.size()) != 
num_segments)) {
+            return Status::InternalError(
+                    "MOW lookup got rowset with inconsistent 
segments_key_bounds, rowset_id={}, "
+                    "aggregated={}, bounds_size={}, num_segments={}",
+                    rs->rowset_id().to_string(),
+                    rs->rowset_meta()->is_segments_key_bounds_aggregated(),
+                    segments_key_bounds.size(), num_segments);
+        }
         std::vector<uint32_t> picked_segments;
         for (int j = num_segments - 1; j >= 0; j--) {
             if (key_is_not_in_segment(key_without_seq, segments_key_bounds[j],
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index c168818f3a9..4891ebbbb2b 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -418,6 +418,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
     // link data to new rowset
     auto seg_id = 0;
     bool segments_key_bounds_truncated {false};
+    bool any_input_aggregated {false};
     std::vector<KeyBoundsPB> segment_key_bounds;
     std::vector<uint32_t> num_segment_rows;
     for (auto rowset : _input_rowsets) {
@@ -425,6 +426,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
                                               _output_rs_writer->rowset_id(), 
seg_id));
         seg_id += rowset->num_segments();
         segments_key_bounds_truncated |= 
rowset->is_segments_key_bounds_truncated();
+        any_input_aggregated |= 
rowset->rowset_meta()->is_segments_key_bounds_aggregated();
         std::vector<KeyBoundsPB> key_bounds;
         RETURN_IF_ERROR(rowset->get_segments_key_bounds(&key_bounds));
         segment_key_bounds.insert(segment_key_bounds.end(), 
key_bounds.begin(), key_bounds.end());
@@ -444,7 +446,13 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
     rowset_meta->set_segments_overlap(NONOVERLAPPING);
     rowset_meta->set_rowset_state(VISIBLE);
     
rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated);
-    rowset_meta->set_segments_key_bounds(segment_key_bounds);
+    // If any input was already aggregated we have no way to recover 
per-segment
+    // bounds, so force aggregation on the output to keep the layout consistent
+    // with `num_segments` / the aggregated flag, even if the config is off 
now.
+    bool aggregate_key_bounds =
+            any_input_aggregated || 
(config::enable_aggregate_non_mow_key_bounds &&
+                                     
!_tablet->enable_unique_key_merge_on_write());
+    rowset_meta->set_segments_key_bounds(segment_key_bounds, 
aggregate_key_bounds);
     rowset_meta->set_num_segment_rows(num_segment_rows);
 
     _output_rowset = _output_rs_writer->manual_build(rowset_meta);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index bdb3dd9075e..a96c497cdaf 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -138,7 +138,10 @@ void build_rowset_meta_with_spec_field(RowsetMeta& 
rowset_meta,
             spec_rowset_meta.is_segments_key_bounds_truncated());
     std::vector<KeyBoundsPB> segments_key_bounds;
     spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
-    rowset_meta.set_segments_key_bounds(segments_key_bounds);
+    // Preserve source layout: if source was aggregated (size 1), 
re-aggregating
+    // the single entry is a no-op that also keeps the flag consistent.
+    rowset_meta.set_segments_key_bounds(segments_key_bounds,
+                                        
spec_rowset_meta.is_segments_key_bounds_aggregated());
     std::vector<uint32_t> num_segment_rows;
     spec_rowset_meta.get_num_segment_rows(&num_segment_rows);
     rowset_meta.set_num_segment_rows(num_segment_rows);
@@ -1074,7 +1077,9 @@ Status 
BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
                                      _total_index_size);
     rowset_meta->set_data_disk_size(total_data_size + _total_data_size);
     rowset_meta->set_index_disk_size(total_index_size + _total_index_size);
-    rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds);
+    bool aggregate_key_bounds = config::enable_aggregate_non_mow_key_bounds &&
+                                !_context.enable_unique_key_merge_on_write;
+    rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds, 
aggregate_key_bounds);
     // TODO write zonemap to meta
     rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0);
     rowset_meta->set_creation_time(time(nullptr));
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 2d2a6267ff8..d02727bd3cf 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -302,6 +302,10 @@ public:
         return _rowset_meta->is_segments_key_bounds_truncated();
     }
 
+    bool is_segments_key_bounds_aggregated() const {
+        return _rowset_meta->is_segments_key_bounds_aggregated();
+    }
+
     bool check_rowset_segment();
 
     [[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
diff --git a/be/src/olap/rowset/rowset_meta.cpp 
b/be/src/olap/rowset/rowset_meta.cpp
index fd3647ca8e7..7744a382d75 100644
--- a/be/src/olap/rowset/rowset_meta.cpp
+++ b/be/src/olap/rowset/rowset_meta.cpp
@@ -291,11 +291,31 @@ int64_t RowsetMeta::segment_file_size(int seg_id) const {
                    : -1;
 }
 
-void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& 
segments_key_bounds) {
-    for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
-        KeyBoundsPB* new_key_bounds = 
_rowset_meta_pb.add_segments_key_bounds();
-        *new_key_bounds = key_bounds;
+void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& 
segments_key_bounds,
+                                         bool aggregate_into_single) {
+    _rowset_meta_pb.clear_segments_key_bounds();
+    bool did_aggregate = aggregate_into_single && !segments_key_bounds.empty();
+    if (did_aggregate) {
+        const std::string* overall_min = 
&segments_key_bounds.front().min_key();
+        const std::string* overall_max = 
&segments_key_bounds.front().max_key();
+        for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
+            if (key_bounds.min_key() < *overall_min) {
+                overall_min = &key_bounds.min_key();
+            }
+            if (key_bounds.max_key() > *overall_max) {
+                overall_max = &key_bounds.max_key();
+            }
+        }
+        KeyBoundsPB* aggregated = _rowset_meta_pb.add_segments_key_bounds();
+        aggregated->set_min_key(*overall_min);
+        aggregated->set_max_key(*overall_max);
+    } else {
+        for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
+            KeyBoundsPB* new_key_bounds = 
_rowset_meta_pb.add_segments_key_bounds();
+            *new_key_bounds = key_bounds;
+        }
     }
+    set_segments_key_bounds_aggregated(did_aggregate);
 
     int32_t truncation_threshold = 
config::segments_key_bounds_truncation_threshold;
     if (config::random_segments_key_bounds_truncation) {
@@ -328,6 +348,11 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& 
other) {
     set_total_disk_size(data_disk_size() + index_disk_size());
     set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
                                       
other.is_segments_key_bounds_truncated());
+    // merge_rowset_meta is used in the MOW partial-update publish path, which 
relies
+    // on per-segment bounds. Aggregation should never be enabled for MOW 
rowsets,
+    // so we do not expect either side to be aggregated here.
+    DCHECK(!is_segments_key_bounds_aggregated() && 
!other.is_segments_key_bounds_aggregated())
+            << "merge_rowset_meta encountered aggregated key bounds";
     if (_rowset_meta_pb.num_segment_rows_size() > 0) {
         if (other.num_segments() > 0) {
             if (other._rowset_meta_pb.num_segment_rows_size() > 0) {
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 9f052cf5e10..fcbd1f1f701 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -355,6 +355,17 @@ public:
         _rowset_meta_pb.set_segments_key_bounds_truncated(truncated);
     }
 
+    // When true, `segments_key_bounds` holds a single aggregated
+    // [rowset_min, rowset_max] entry instead of per-segment bounds.
+    bool is_segments_key_bounds_aggregated() const {
+        return _rowset_meta_pb.has_segments_key_bounds_aggregated() &&
+               _rowset_meta_pb.segments_key_bounds_aggregated();
+    }
+
+    void set_segments_key_bounds_aggregated(bool aggregated) {
+        _rowset_meta_pb.set_segments_key_bounds_aggregated(aggregated);
+    }
+
     bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
         // for compatibility, old version has not segment key bounds
         if (_rowset_meta_pb.segments_key_bounds_size() == 0) {
@@ -372,7 +383,10 @@ public:
         return true;
     }
 
-    void set_segments_key_bounds(const std::vector<KeyBoundsPB>& 
segments_key_bounds);
+    // If `aggregate_into_single` is true, collapse per-segment bounds into a 
single
+    // [rowset_min, rowset_max] entry and mark this rowset as aggregated.
+    void set_segments_key_bounds(const std::vector<KeyBoundsPB>& 
segments_key_bounds,
+                                 bool aggregate_into_single = false);
 
     void add_segment_key_bounds(KeyBoundsPB segments_key_bounds) {
         *_rowset_meta_pb.add_segments_key_bounds() = 
std::move(segments_key_bounds);
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index b28aa478356..450a0701822 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -227,8 +227,9 @@ Result<std::vector<PendingRowsetGuard>> 
SnapshotManager::convert_rowset_ids(
             // src be local rowset
             RowsetId rowset_id = _engine.next_rowset_id();
             guards.push_back(_engine.pending_local_rowsets().add(rowset_id));
-            RETURN_IF_ERROR_RESULT(_rename_rowset_id(visible_rowset, 
clone_dir, tablet_schema,
-                                                     rowset_id, rowset_meta));
+            RETURN_IF_ERROR_RESULT(_rename_rowset_id(
+                    visible_rowset, clone_dir, tablet_schema, rowset_id, 
rowset_meta,
+                    new_tablet_meta_pb.enable_unique_key_merge_on_write()));
             RowsetId src_rs_id;
             if (visible_rowset.rowset_id() > 0) {
                 src_rs_id.init(visible_rowset.rowset_id());
@@ -269,8 +270,9 @@ Result<std::vector<PendingRowsetGuard>> 
SnapshotManager::convert_rowset_ids(
             // src be local rowset
             RowsetId rowset_id = _engine.next_rowset_id();
             guards.push_back(_engine.pending_local_rowsets().add(rowset_id));
-            RETURN_IF_ERROR_RESULT(_rename_rowset_id(stale_rowset, clone_dir, 
tablet_schema,
-                                                     rowset_id, rowset_meta));
+            RETURN_IF_ERROR_RESULT(_rename_rowset_id(
+                    stale_rowset, clone_dir, tablet_schema, rowset_id, 
rowset_meta,
+                    new_tablet_meta_pb.enable_unique_key_merge_on_write()));
             RowsetId src_rs_id;
             if (stale_rowset.rowset_id() > 0) {
                 src_rs_id.init(stale_rowset.rowset_id());
@@ -324,7 +326,8 @@ Result<std::vector<PendingRowsetGuard>> 
SnapshotManager::convert_rowset_ids(
 Status SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
                                           const std::string& new_tablet_path,
                                           TabletSchemaSPtr tablet_schema, 
const RowsetId& rowset_id,
-                                          RowsetMetaPB* new_rs_meta_pb) {
+                                          RowsetMetaPB* new_rs_meta_pb,
+                                          bool 
enable_unique_key_merge_on_write) {
     Status st = Status::OK();
     RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
     rowset_meta->init_from_pb(rs_meta_pb);
@@ -350,6 +353,9 @@ Status SnapshotManager::_rename_rowset_id(const 
RowsetMetaPB& rs_meta_pb,
     context.newest_write_timestamp = org_rowset_meta->newest_write_timestamp();
     // keep segments_overlap same as origin rowset
     context.segments_overlap = rowset_meta->segments_overlap();
+    // propagate MOW flag so that non-MOW key-bounds aggregation is not applied
+    // when restoring a MOW tablet's rowset
+    context.enable_unique_key_merge_on_write = 
enable_unique_key_merge_on_write;
 
     auto rs_writer = DORIS_TRY(RowsetFactory::create_rowset_writer(_engine, 
context, false));
 
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index 01303dac4f2..f63c62215d5 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -132,7 +132,7 @@ private:
 
     Status _rename_rowset_id(const RowsetMetaPB& rs_meta_pb, const 
std::string& new_tablet_path,
                              TabletSchemaSPtr tablet_schema, const RowsetId& 
next_id,
-                             RowsetMetaPB* new_rs_meta_pb);
+                             RowsetMetaPB* new_rs_meta_pb, bool 
enable_unique_key_merge_on_write);
 
     Status _rename_index_ids(TabletSchemaPB& schema_pb,
                              const TabletSchemaSPtr& tablet_schema) const;
diff --git a/be/src/olap/task/index_builder.cpp 
b/be/src/olap/task/index_builder.cpp
index 1dc0c5e7565..2744f953ec9 100644
--- a/be/src/olap/task/index_builder.cpp
+++ b/be/src/olap/task/index_builder.cpp
@@ -307,7 +307,10 @@ Status IndexBuilder::update_inverted_index_info() {
         RETURN_IF_ERROR(input_rowset->get_segments_key_bounds(&key_bounds));
         rowset_meta->set_segments_key_bounds_truncated(
                 input_rowset_meta->is_segments_key_bounds_truncated());
-        rowset_meta->set_segments_key_bounds(key_bounds);
+        // preserve aggregated layout via the setter so the aggregated flag is 
not
+        // clobbered by set_segments_key_bounds's default reset path.
+        rowset_meta->set_segments_key_bounds(
+                key_bounds, 
input_rowset_meta->is_segments_key_bounds_aggregated());
         std::vector<uint32_t> num_segment_rows;
         input_rowset_meta->get_num_segment_rows(&num_segment_rows);
         rowset_meta->set_num_segment_rows(num_segment_rows);
diff --git a/be/test/olap/rowset/rowset_meta_test.cpp 
b/be/test/olap/rowset/rowset_meta_test.cpp
index c78b5803f03..768ff00169e 100644
--- a/be/test/olap/rowset/rowset_meta_test.cpp
+++ b/be/test/olap/rowset/rowset_meta_test.cpp
@@ -303,4 +303,156 @@ TEST_F(RowsetMetaTest, TestMergeRowsetMetaBothEmpty) {
     EXPECT_EQ(rowset_meta_1.num_segments(), 5);
 }
 
+TEST_F(RowsetMetaTest, TestSegmentsKeyBoundsAggregation) {
+    auto make_bounds = [](std::string min_key, std::string max_key) {
+        KeyBoundsPB kb;
+        kb.set_min_key(std::move(min_key));
+        kb.set_max_key(std::move(max_key));
+        return kb;
+    };
+
+    // Prepare three per-segment bounds whose overall min is "a01" and overall 
max is "z99".
+    // Intentionally unordered so that the aggregation must scan all entries.
+    std::vector<KeyBoundsPB> per_segment;
+    per_segment.push_back(make_bounds("m50", "z99"));
+    per_segment.push_back(make_bounds("a01", "k10"));
+    per_segment.push_back(make_bounds("f20", "r80"));
+
+    // Save and restore truncation config to keep the test deterministic.
+    int32_t saved_truncation = 
config::segments_key_bounds_truncation_threshold;
+    config::segments_key_bounds_truncation_threshold = -1;
+    auto restore = std::shared_ptr<void>(nullptr, [&](void*) {
+        config::segments_key_bounds_truncation_threshold = saved_truncation;
+    });
+
+    // 1. aggregate=true -> single [overall_min, overall_max] entry, flag set.
+    {
+        RowsetMeta rs_meta;
+        rs_meta.set_num_segments(per_segment.size());
+        rs_meta.set_segments_key_bounds(per_segment, 
/*aggregate_into_single=*/true);
+
+        std::vector<KeyBoundsPB> out;
+        rs_meta.get_segments_key_bounds(&out);
+        ASSERT_EQ(out.size(), 1);
+        EXPECT_EQ(out[0].min_key(), "a01");
+        EXPECT_EQ(out[0].max_key(), "z99");
+        EXPECT_TRUE(rs_meta.is_segments_key_bounds_aggregated());
+
+        // first_key/last_key must still return the global min/max.
+        KeyBoundsPB first;
+        KeyBoundsPB last;
+        ASSERT_TRUE(rs_meta.get_first_segment_key_bound(&first));
+        ASSERT_TRUE(rs_meta.get_last_segment_key_bound(&last));
+        EXPECT_EQ(first.min_key(), "a01");
+        EXPECT_EQ(last.max_key(), "z99");
+    }
+
+    // 2. aggregate=false (default) -> per-segment entries preserved, flag 
unset.
+    {
+        RowsetMeta rs_meta;
+        rs_meta.set_num_segments(per_segment.size());
+        rs_meta.set_segments_key_bounds(per_segment);
+
+        std::vector<KeyBoundsPB> out;
+        rs_meta.get_segments_key_bounds(&out);
+        ASSERT_EQ(out.size(), per_segment.size());
+        EXPECT_FALSE(rs_meta.is_segments_key_bounds_aggregated());
+        for (size_t i = 0; i < per_segment.size(); ++i) {
+            EXPECT_EQ(out[i].min_key(), per_segment[i].min_key());
+            EXPECT_EQ(out[i].max_key(), per_segment[i].max_key());
+        }
+    }
+
+    // 3. aggregate=true with empty input -> nothing written, flag untouched.
+    {
+        RowsetMeta rs_meta;
+        rs_meta.set_segments_key_bounds({}, /*aggregate_into_single=*/true);
+
+        std::vector<KeyBoundsPB> out;
+        rs_meta.get_segments_key_bounds(&out);
+        EXPECT_EQ(out.size(), 0);
+        EXPECT_FALSE(rs_meta.is_segments_key_bounds_aggregated());
+    }
+
+    // 4. aggregate=true called twice -> result reflects the latest call only.
+    {
+        RowsetMeta rs_meta;
+        rs_meta.set_segments_key_bounds(per_segment, 
/*aggregate_into_single=*/true);
+        std::vector<KeyBoundsPB> second;
+        second.push_back(make_bounds("b00", "c00"));
+        rs_meta.set_segments_key_bounds(second, 
/*aggregate_into_single=*/true);
+
+        std::vector<KeyBoundsPB> out;
+        rs_meta.get_segments_key_bounds(&out);
+        ASSERT_EQ(out.size(), 1);
+        EXPECT_EQ(out[0].min_key(), "b00");
+        EXPECT_EQ(out[0].max_key(), "c00");
+        EXPECT_TRUE(rs_meta.is_segments_key_bounds_aggregated());
+    }
+
+    // 5. aggregated flag must be reset when switching from aggregate=true to
+    //    aggregate=false on the same instance.
+    {
+        RowsetMeta rs_meta;
+        rs_meta.set_segments_key_bounds(per_segment, 
/*aggregate_into_single=*/true);
+        ASSERT_TRUE(rs_meta.is_segments_key_bounds_aggregated());
+
+        rs_meta.set_segments_key_bounds(per_segment, 
/*aggregate_into_single=*/false);
+        EXPECT_FALSE(rs_meta.is_segments_key_bounds_aggregated());
+
+        std::vector<KeyBoundsPB> out;
+        rs_meta.get_segments_key_bounds(&out);
+        EXPECT_EQ(out.size(), per_segment.size());
+    }
+
+    // 6. aggregated flag must be reset when calling with aggregate=true but an
+    //    empty input after a prior aggregated call.
+    {
+        RowsetMeta rs_meta;
+        rs_meta.set_segments_key_bounds(per_segment, 
/*aggregate_into_single=*/true);
+        ASSERT_TRUE(rs_meta.is_segments_key_bounds_aggregated());
+
+        rs_meta.set_segments_key_bounds({}, /*aggregate_into_single=*/true);
+        EXPECT_FALSE(rs_meta.is_segments_key_bounds_aggregated());
+
+        std::vector<KeyBoundsPB> out;
+        rs_meta.get_segments_key_bounds(&out);
+        EXPECT_TRUE(out.empty());
+    }
+}
+
+TEST_F(RowsetMetaTest, TestSegmentsKeyBoundsAggregationTruncation) {
+    // Aggregated entry is still subject to truncation.
+    int32_t saved_truncation = 
config::segments_key_bounds_truncation_threshold;
+    bool saved_random = config::random_segments_key_bounds_truncation;
+    config::segments_key_bounds_truncation_threshold = 4;
+    config::random_segments_key_bounds_truncation = false;
+    auto restore = std::shared_ptr<void>(nullptr, [&](void*) {
+        config::segments_key_bounds_truncation_threshold = saved_truncation;
+        config::random_segments_key_bounds_truncation = saved_random;
+    });
+
+    auto make_bounds = [](std::string min_key, std::string max_key) {
+        KeyBoundsPB kb;
+        kb.set_min_key(std::move(min_key));
+        kb.set_max_key(std::move(max_key));
+        return kb;
+    };
+
+    std::vector<KeyBoundsPB> per_segment;
+    per_segment.push_back(make_bounds("aaaaaaa", "bbbbbbb"));
+    per_segment.push_back(make_bounds("ccccccc", "ddddddd"));
+
+    RowsetMeta rs_meta;
+    rs_meta.set_segments_key_bounds(per_segment, 
/*aggregate_into_single=*/true);
+
+    std::vector<KeyBoundsPB> out;
+    rs_meta.get_segments_key_bounds(&out);
+    ASSERT_EQ(out.size(), 1);
+    EXPECT_EQ(out[0].min_key(), std::string("aaaa"));
+    EXPECT_EQ(out[0].max_key(), std::string("dddd"));
+    EXPECT_TRUE(rs_meta.is_segments_key_bounds_aggregated());
+    EXPECT_TRUE(rs_meta.is_segments_key_bounds_truncated());
+}
+
 } // namespace doris
diff --git a/be/test/olap/segments_key_bounds_truncation_test.cpp 
b/be/test/olap/segments_key_bounds_truncation_test.cpp
index faca751270b..e87212ecfa8 100644
--- a/be/test/olap/segments_key_bounds_truncation_test.cpp
+++ b/be/test/olap/segments_key_bounds_truncation_test.cpp
@@ -48,6 +48,7 @@ private:
     std::string absolute_dir;
     std::unique_ptr<DataDir> data_dir;
     int cur_version {2};
+    bool _saved_aggregate_non_mow_key_bounds {false};
 
 public:
     void SetUp() override {
@@ -61,9 +62,15 @@ public:
         data_dir = std::make_unique<DataDir>(*engine_ref, kSegmentDir);
         ASSERT_TRUE(data_dir->update_capacity().ok());
         ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
+
+        // This suite asserts per-segment key bounds; force the non-MOW 
aggregation
+        // feature off so it is isolated from that orthogonal code path.
+        _saved_aggregate_non_mow_key_bounds = 
config::enable_aggregate_non_mow_key_bounds;
+        config::enable_aggregate_non_mow_key_bounds = false;
     }
 
     void TearDown() override {
+        config::enable_aggregate_non_mow_key_bounds = 
_saved_aggregate_non_mow_key_bounds;
         
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(kSegmentDir).ok());
         engine_ref = nullptr;
         ExecEnv::GetInstance()->set_storage_engine(nullptr);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 073185a25f6..390f50e583c 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -140,6 +140,11 @@ message RowsetMetaPB {
     // rows count for each segment
     repeated int64 num_segment_rows = 56;
 
+    // If true, `segments_key_bounds` contains a single aggregated
+    // [rowset_min_key, rowset_max_key] entry instead of per-segment bounds.
+    // Only applies to non-MOW rowsets to reduce meta size on cloud FDB.
+    optional bool segments_key_bounds_aggregated = 57;
+
     // For cloud
     // for data recycling
     optional int64 txn_expiration = 1000;
@@ -244,6 +249,11 @@ message RowsetMetaCloudPB {
     // rows count for each segment
     repeated int64 num_segment_rows = 56;
 
+    // If true, `segments_key_bounds` contains a single aggregated
+    // [rowset_min_key, rowset_max_key] entry instead of per-segment bounds.
+    // Only applies to non-MOW rowsets to reduce meta size on cloud FDB.
+    optional bool segments_key_bounds_aggregated = 57;
+
     // cloud
     // the field is a vector, rename it
     repeated int64 segments_file_size = 100;
diff --git 
a/regression-test/suites/data_model_p0/duplicate/test_non_mow_key_bounds_aggregation.groovy
 
b/regression-test/suites/data_model_p0/duplicate/test_non_mow_key_bounds_aggregation.groovy
new file mode 100644
index 00000000000..5cacee3e4e6
--- /dev/null
+++ 
b/regression-test/suites/data_model_p0/duplicate/test_non_mow_key_bounds_aggregation.groovy
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.Http
+
+// Verifies that when `enable_aggregate_non_mow_key_bounds` is on, non-MOW
+// rowsets collapse per-segment key bounds into a single [min, max] entry
+// while MOW rowsets keep per-segment bounds regardless of the config.
+suite("test_non_mow_key_bounds_aggregation", "nonConcurrent") {
+
+    // see be/src/util/key_util.h:50
+    def keyNormalMarker = new String(new Byte[]{2})
+
+    def fetchRowsetMetaAtVersion = { String table, int version ->
+        def metaUrl = sql_return_maparray("show tablets from 
${table};").get(0).MetaUrl
+        def jsonMeta = Http.GET(metaUrl, true, false)
+        for (def meta : jsonMeta.rs_metas) {
+            if (meta.end_version as int == version) {
+                return meta
+            }
+        }
+        if (isCloudMode()) {
+            for (int i = 0; i < 60; i++) {
+                Thread.sleep(1000)
+                jsonMeta = Http.GET(metaUrl, true, false)
+                for (def meta : jsonMeta.rs_metas) {
+                    if (meta.end_version as int == version) {
+                        return meta
+                    }
+                }
+            }
+        }
+        return null
+    }
+
+    def dupTable = "test_non_mow_key_bounds_aggregation_dup"
+    def mowTable = "test_non_mow_key_bounds_aggregation_mow"
+
+    sql """ DROP TABLE IF EXISTS ${dupTable} force; """
+    sql """ CREATE TABLE ${dupTable} (
+        `k` varchar(65533) NOT NULL,
+        `v` int)
+        DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
+        PROPERTIES("replication_num" = "1",
+                "disable_auto_compaction" = "true"); """
+
+    sql """ DROP TABLE IF EXISTS ${mowTable} force; """
+    sql """ CREATE TABLE ${mowTable} (
+        `k` varchar(65533) NOT NULL,
+        `v` int)
+        UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
+        PROPERTIES("replication_num" = "1",
+                "enable_unique_key_merge_on_write" = "true",
+                "disable_auto_compaction" = "true"); """
+
+    int dupVersion = 1
+    int mowVersion = 1
+
+    // Disable truncation so bounds content equals the raw keys (with marker 
prefix).
+    // Enable aggregation to exercise the new code path for non-MOW rowsets.
+    def configOn = [
+        segments_key_bounds_truncation_threshold: -1,
+        enable_aggregate_non_mow_key_bounds     : true,
+    ]
+
+    setBeConfigTemporary(configOn) {
+        // Case 1: non-MOW with config on -> aggregated single [min, max] 
entry.
+        String dupK1 = "aaaaaaaa"
+        String dupK2 = "mmmmmmmm"
+        String dupK3 = "zzzzzzzz"
+        sql """insert into ${dupTable} values("${dupK1}", 1), ("${dupK3}", 3), 
("${dupK2}", 2);"""
+        def dupMeta = fetchRowsetMetaAtVersion(dupTable, ++dupVersion)
+        assertNotNull(dupMeta)
+        logger.info("dup rowset meta (config on): ${dupMeta}")
+        assertTrue(dupMeta.segments_key_bounds_aggregated as boolean)
+        assertEquals(1, dupMeta.segments_key_bounds.size())
+        assertEquals(keyNormalMarker + dupK1, 
dupMeta.segments_key_bounds.get(0).min_key)
+        assertEquals(keyNormalMarker + dupK3, 
dupMeta.segments_key_bounds.get(0).max_key)
+
+        // Case 2: MOW with config on -> per-segment bounds preserved, flag 
unset.
+        String mowK1 = "mmmm1111"
+        String mowK2 = "mmmm2222"
+        sql """insert into ${mowTable} values("${mowK1}", 1), ("${mowK2}", 
2);"""
+        def mowMeta = fetchRowsetMetaAtVersion(mowTable, ++mowVersion)
+        assertNotNull(mowMeta)
+        logger.info("mow rowset meta (config on): ${mowMeta}")
+        assertFalse((mowMeta.segments_key_bounds_aggregated ?: false) as 
boolean)
+        assertEquals(mowMeta.num_segments as int, 
mowMeta.segments_key_bounds.size())
+    }
+
+    // Case 3: non-MOW with config off -> per-segment bounds (size == 
num_segments), flag unset.
+    def configOff = [
+        segments_key_bounds_truncation_threshold: -1,
+        enable_aggregate_non_mow_key_bounds     : false,
+    ]
+    setBeConfigTemporary(configOff) {
+        String dupK1 = "bbbb0000"
+        String dupK2 = "bbbb9999"
+        sql """insert into ${dupTable} values("${dupK1}", 10), ("${dupK2}", 
20);"""
+        def dupMeta = fetchRowsetMetaAtVersion(dupTable, ++dupVersion)
+        assertNotNull(dupMeta)
+        logger.info("dup rowset meta (config off): ${dupMeta}")
+        assertFalse((dupMeta.segments_key_bounds_aggregated ?: false) as 
boolean)
+        assertEquals(dupMeta.num_segments as int, 
dupMeta.segments_key_bounds.size())
+    }
+
+    // Case 4: rebuilding an already-aggregated non-MOW rowset through the 
index rewrite
+    // path must preserve the aggregated flag and single-entry layout. 
Regression guard for
+    // IndexBuilder clobbering the flag via set_segments_key_bounds's default 
parameter.
+    // IndexBuilder runs on local rowsets only, so skip this case in cloud 
mode.
+    if (!isCloudMode()) {
+        def idxTable = "test_non_mow_key_bounds_aggregation_idx"
+        sql """ DROP TABLE IF EXISTS ${idxTable} force; """
+        sql """ CREATE TABLE ${idxTable} (
+            `k` varchar(65533) NOT NULL,
+            `v` int,
+            INDEX idx_v (v) USING INVERTED)
+            DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
+            PROPERTIES("replication_num" = "1",
+                    "disable_auto_compaction" = "true",
+                    "light_schema_change" = "true"); """
+
+        def waitAlterFinish = { String tbl, int timeoutSec ->
+            for (int i = 0; i < timeoutSec; i++) {
+                def rs = sql """SHOW ALTER TABLE COLUMN WHERE TableName = 
"${tbl}" ORDER BY CreateTime DESC LIMIT 1;"""
+                if (rs.size() == 0 || rs[0][9] == "FINISHED") {
+                    return
+                }
+                if (rs[0][9] == "CANCELLED") {
+                    throw new IllegalStateException("alter cancelled: ${rs}")
+                }
+                Thread.sleep(1000)
+            }
+            throw new IllegalStateException("waitAlterFinish timeout")
+        }
+
+        setBeConfigTemporary(configOn) {
+            String idxK1 = "idx00000"
+            String idxK2 = "idxzzzzz"
+            sql """insert into ${idxTable} values("${idxK1}", 1), ("${idxK2}", 
2);"""
+            int idxVersion = 2
+            def beforeMeta = fetchRowsetMetaAtVersion(idxTable, idxVersion)
+            assertNotNull(beforeMeta)
+            logger.info("idx rowset meta before DROP INDEX: ${beforeMeta}")
+            assertTrue(beforeMeta.segments_key_bounds_aggregated as boolean)
+            assertEquals(1, beforeMeta.segments_key_bounds.size())
+
+            // DROP INDEX drives IndexBuilder, which rebuilds rowset meta in 
place.
+            sql """ALTER TABLE ${idxTable} DROP INDEX idx_v;"""
+            waitAlterFinish(idxTable, 120)
+
+            def afterMeta = fetchRowsetMetaAtVersion(idxTable, idxVersion)
+            assertNotNull(afterMeta)
+            logger.info("idx rowset meta after DROP INDEX: ${afterMeta}")
+            assertTrue(afterMeta.segments_key_bounds_aggregated as boolean)
+            assertEquals(1, afterMeta.segments_key_bounds.size())
+            assertEquals(beforeMeta.segments_key_bounds.get(0).min_key,
+                         afterMeta.segments_key_bounds.get(0).min_key)
+            assertEquals(beforeMeta.segments_key_bounds.get(0).max_key,
+                         afterMeta.segments_key_bounds.get(0).max_key)
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to