This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 27f574a  feat: add bucket and level statistics to ManifestFileMeta 
(#350)
27f574a is described below

commit 27f574a984f9b6e2136c523d77779815cb2527a1
Author: shyjsarah <[email protected]>
AuthorDate: Tue Jun 2 09:23:16 2026 +0800

    feat: add bucket and level statistics to ManifestFileMeta (#350)
---
 .../src/spec/avro/manifest_file_meta_decode.rs     |  22 ++++
 crates/paimon/src/spec/manifest_file_meta.rs       |  99 ++++++++++++++
 crates/paimon/src/spec/manifest_list.rs            | 144 +++++++++++++++++++++
 crates/paimon/src/spec/objects_file.rs             |   4 +
 crates/paimon/src/table/table_commit.rs            |  56 +++++++-
 5 files changed, 324 insertions(+), 1 deletion(-)

diff --git a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs 
b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
index 67d4f2e..7b30c52 100644
--- a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
+++ b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
@@ -33,6 +33,10 @@ impl AvroRecordDecode for ManifestFileMeta {
         let mut num_deleted_files: Option<i64> = None;
         let mut partition_stats: Option<BinaryTableStats> = None;
         let mut schema_id: Option<i64> = None;
+        let mut min_bucket: Option<i32> = None;
+        let mut max_bucket: Option<i32> = None;
+        let mut min_level: Option<i32> = None;
+        let mut max_level: Option<i32> = None;
         let mut min_row_id: Option<i64> = None;
         let mut max_row_id: Option<i64> = None;
 
@@ -52,6 +56,10 @@ impl AvroRecordDecode for ManifestFileMeta {
                         decode_nullable_binary_table_stats(cursor, 
&field.schema, field.nullable)?;
                 }
                 "_SCHEMA_ID" => schema_id = Some(read_long_field(cursor, 
field.nullable)?),
+                "_MIN_BUCKET" => min_bucket = read_optional_int(cursor, 
field.nullable)?,
+                "_MAX_BUCKET" => max_bucket = read_optional_int(cursor, 
field.nullable)?,
+                "_MIN_LEVEL" => min_level = read_optional_int(cursor, 
field.nullable)?,
+                "_MAX_LEVEL" => max_level = read_optional_int(cursor, 
field.nullable)?,
                 "_MIN_ROW_ID" => min_row_id = read_optional_long(cursor, 
field.nullable)?,
                 "_MAX_ROW_ID" => max_row_id = read_optional_long(cursor, 
field.nullable)?,
                 _ => skip_nullable_field(cursor, &field.schema, 
field.nullable)?,
@@ -66,12 +74,26 @@ impl AvroRecordDecode for ManifestFileMeta {
             num_deleted_files.unwrap_or(0),
             partition_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], 
vec![], vec![])),
             schema_id.unwrap_or(0),
+            min_bucket,
+            max_bucket,
+            min_level,
+            max_level,
             min_row_id,
             max_row_id,
         ))
     }
 }
 
+fn read_optional_int(cursor: &mut AvroCursor, nullable: bool) -> 
crate::Result<Option<i32>> {
+    if nullable {
+        let idx = cursor.read_union_index()?;
+        if idx == 0 {
+            return Ok(None);
+        }
+    }
+    Ok(Some(cursor.read_int()?))
+}
+
 fn read_optional_long(cursor: &mut AvroCursor, nullable: bool) -> 
crate::Result<Option<i64>> {
     if nullable {
         let idx = cursor.read_union_index()?;
diff --git a/crates/paimon/src/spec/manifest_file_meta.rs 
b/crates/paimon/src/spec/manifest_file_meta.rs
index f0dddb7..51bb81c 100644
--- a/crates/paimon/src/spec/manifest_file_meta.rs
+++ b/crates/paimon/src/spec/manifest_file_meta.rs
@@ -51,6 +51,43 @@ pub struct ManifestFileMeta {
     #[serde(rename = "_SCHEMA_ID")]
     schema_id: i64,
 
+    /// minimum bucket covered by entries in this manifest, used by the Java 
reader to
+    /// prune manifests that do not overlap a requested bucket. Always `None` 
together
+    /// with `max_bucket` for back-compat manifests written before bucket 
statistics
+    /// were introduced (apache/paimon#5345).
+    #[serde(
+        rename = "_MIN_BUCKET",
+        default,
+        skip_serializing_if = "Option::is_none"
+    )]
+    min_bucket: Option<i32>,
+
+    /// maximum bucket covered by entries in this manifest. See `min_bucket`.
+    #[serde(
+        rename = "_MAX_BUCKET",
+        default,
+        skip_serializing_if = "Option::is_none"
+    )]
+    max_bucket: Option<i32>,
+
+    /// minimum LSM level covered by entries in this manifest, used by the 
Java reader
+    /// for level-based pruning (e.g. compaction's level filter). Same 
back-compat note
+    /// as `min_bucket`.
+    #[serde(
+        rename = "_MIN_LEVEL",
+        default,
+        skip_serializing_if = "Option::is_none"
+    )]
+    min_level: Option<i32>,
+
+    /// maximum LSM level covered by entries in this manifest. See `min_level`.
+    #[serde(
+        rename = "_MAX_LEVEL",
+        default,
+        skip_serializing_if = "Option::is_none"
+    )]
+    max_level: Option<i32>,
+
     /// minimum row id covered by this manifest, when row tracking is enabled.
     #[serde(
         rename = "_MIN_ROW_ID",
@@ -110,6 +147,31 @@ impl ManifestFileMeta {
         self.version
     }
 
+    /// Get the minimum bucket covered by this manifest (None when bucket 
stats are absent,
+    /// e.g. manifests written before the field was introduced).
+    #[inline]
+    pub fn min_bucket(&self) -> Option<i32> {
+        self.min_bucket
+    }
+
+    /// Get the maximum bucket covered by this manifest (None when bucket 
stats are absent).
+    #[inline]
+    pub fn max_bucket(&self) -> Option<i32> {
+        self.max_bucket
+    }
+
+    /// Get the minimum LSM level covered by this manifest (None when level 
stats are absent).
+    #[inline]
+    pub fn min_level(&self) -> Option<i32> {
+        self.min_level
+    }
+
+    /// Get the maximum LSM level covered by this manifest (None when level 
stats are absent).
+    #[inline]
+    pub fn max_level(&self) -> Option<i32> {
+        self.max_level
+    }
+
     /// Get the minimum row id covered by this manifest (None when row 
tracking is disabled).
     #[inline]
     pub fn min_row_id(&self) -> Option<i64> {
@@ -122,6 +184,27 @@ impl ManifestFileMeta {
         self.max_row_id
     }
 
+    /// Attach bucket / level statistics aggregated from manifest entries.
+    ///
+    /// Use this in writers that have access to the entries that the manifest 
covers.
+    /// Setting all four to `None` is equivalent to leaving the stats absent 
(the Java
+    /// reader treats `null` here as "no information; do not prune").
+    #[inline]
+    #[must_use]
+    pub fn with_bucket_level_stats(
+        mut self,
+        min_bucket: Option<i32>,
+        max_bucket: Option<i32>,
+        min_level: Option<i32>,
+        max_level: Option<i32>,
+    ) -> Self {
+        self.min_bucket = min_bucket;
+        self.max_bucket = max_bucket;
+        self.min_level = min_level;
+        self.max_level = max_level;
+        self
+    }
+
     #[inline]
     pub fn new(
         file_name: String,
@@ -139,6 +222,10 @@ impl ManifestFileMeta {
             num_deleted_files,
             partition_stats,
             schema_id,
+            min_bucket: None,
+            max_bucket: None,
+            min_level: None,
+            max_level: None,
             min_row_id: None,
             max_row_id: None,
         }
@@ -154,6 +241,10 @@ impl ManifestFileMeta {
         num_deleted_files: i64,
         partition_stats: BinaryTableStats,
         schema_id: i64,
+        min_bucket: Option<i32>,
+        max_bucket: Option<i32>,
+        min_level: Option<i32>,
+        max_level: Option<i32>,
         min_row_id: Option<i64>,
         max_row_id: Option<i64>,
     ) -> ManifestFileMeta {
@@ -165,6 +256,10 @@ impl ManifestFileMeta {
             num_deleted_files,
             partition_stats,
             schema_id,
+            min_bucket,
+            max_bucket,
+            min_level,
+            max_level,
             min_row_id,
             max_row_id,
         }
@@ -192,6 +287,10 @@ pub const MANIFEST_FILE_META_SCHEMA: &str = r#"["null", {
             ]
         }], "default": null},
         {"name": "_SCHEMA_ID", "type": "long"},
+        {"name": "_MIN_BUCKET", "type": ["null", "int"], "default": null},
+        {"name": "_MAX_BUCKET", "type": ["null", "int"], "default": null},
+        {"name": "_MIN_LEVEL", "type": ["null", "int"], "default": null},
+        {"name": "_MAX_LEVEL", "type": ["null", "int"], "default": null},
         {"name": "_MIN_ROW_ID", "type": ["null", "long"], "default": null},
         {"name": "_MAX_ROW_ID", "type": ["null", "long"], "default": null}
     ]
diff --git a/crates/paimon/src/spec/manifest_list.rs 
b/crates/paimon/src/spec/manifest_list.rs
index 16eb049..cf3985c 100644
--- a/crates/paimon/src/spec/manifest_list.rs
+++ b/crates/paimon/src/spec/manifest_list.rs
@@ -112,4 +112,148 @@ mod tests {
         let decoded = ManifestList::read(&file_io, path).await.unwrap();
         assert!(decoded.is_empty());
     }
+
+    /// Round-trip bucket / level statistics through Avro so future schema 
drift is caught
+    /// here, not in production. Matches the fields added in 
apache/paimon#5345.
+    #[tokio::test]
+    async fn test_manifest_list_roundtrip_preserves_bucket_level_stats() {
+        let file_io = test_file_io();
+        let path = "memory:/test_manifest_list_bucket_level/manifest-list-0";
+        file_io
+            .mkdirs("memory:/test_manifest_list_bucket_level/")
+            .await
+            .unwrap();
+
+        let value_bytes = vec![
+            0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 
0, 0, 0, 0, 0, 129,
+        ];
+        let original = vec![ManifestFileMeta::new(
+            "manifest-bucket-level".to_string(),
+            4096,
+            3,
+            0,
+            BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), 
vec![Some(0)]),
+            0,
+        )
+        .with_bucket_level_stats(Some(-1), Some(7), Some(0), Some(5))];
+
+        ManifestList::write(&file_io, path, &original)
+            .await
+            .unwrap();
+        let decoded = ManifestList::read(&file_io, path).await.unwrap();
+        assert_eq!(decoded.len(), 1);
+        assert_eq!(decoded[0].min_bucket(), Some(-1));
+        assert_eq!(decoded[0].max_bucket(), Some(7));
+        assert_eq!(decoded[0].min_level(), Some(0));
+        assert_eq!(decoded[0].max_level(), Some(5));
+        // Sanity: nothing else changed.
+        assert_eq!(decoded[0].file_name(), "manifest-bucket-level");
+        assert_eq!(decoded[0].num_added_files(), 3);
+    }
+
+    /// Back-compat: a manifest list written without the bucket / level fields 
(e.g. by an
+    /// older Rust writer or any Java writer pre apache/paimon#5345) must 
decode into
+    /// `None` rather than failing or yielding bogus values.
+    #[tokio::test]
+    async fn test_manifest_list_decodes_legacy_without_bucket_level_fields() {
+        use apache_avro::{Codec, Schema, Writer};
+        use std::collections::HashMap;
+
+        let file_io = test_file_io();
+        let path = "memory:/test_manifest_list_legacy/manifest-list-0";
+        file_io
+            .mkdirs("memory:/test_manifest_list_legacy/")
+            .await
+            .unwrap();
+
+        // Avro schema with the pre-5345 shape: no _MIN/_MAX_BUCKET/LEVEL 
fields.
+        let legacy_schema = r#"["null", {
+            "type": "record",
+            "name": "record",
+            "namespace": "org.apache.paimon.avro.generated",
+            "fields": [
+                {"name": "_VERSION", "type": "int"},
+                {"name": "_FILE_NAME", "type": "string"},
+                {"name": "_FILE_SIZE", "type": "long"},
+                {"name": "_NUM_ADDED_FILES", "type": "long"},
+                {"name": "_NUM_DELETED_FILES", "type": "long"},
+                {"name": "_PARTITION_STATS", "type": ["null", {
+                    "type": "record",
+                    "name": "record__PARTITION_STATS",
+                    "fields": [
+                        {"name": "_MIN_VALUES", "type": "bytes"},
+                        {"name": "_MAX_VALUES", "type": "bytes"},
+                        {"name": "_NULL_COUNTS", "type": ["null", {"type": 
"array", "items": ["null", "long"]}], "default": null}
+                    ]
+                }], "default": null}
+            ]
+        }]"#;
+        let schema = Schema::parse_str(legacy_schema).unwrap();
+        let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
+        let value_bytes = vec![0u8; 12];
+        let mut record: HashMap<String, apache_avro::types::Value> = 
HashMap::new();
+        record.insert("_VERSION".to_string(), 
apache_avro::types::Value::Int(2));
+        record.insert(
+            "_FILE_NAME".to_string(),
+            apache_avro::types::Value::String("manifest-legacy".to_string()),
+        );
+        record.insert(
+            "_FILE_SIZE".to_string(),
+            apache_avro::types::Value::Long(1024),
+        );
+        record.insert(
+            "_NUM_ADDED_FILES".to_string(),
+            apache_avro::types::Value::Long(2),
+        );
+        record.insert(
+            "_NUM_DELETED_FILES".to_string(),
+            apache_avro::types::Value::Long(0),
+        );
+        record.insert(
+            "_PARTITION_STATS".to_string(),
+            apache_avro::types::Value::Union(
+                1,
+                Box::new(apache_avro::types::Value::Record(vec![
+                    (
+                        "_MIN_VALUES".to_string(),
+                        apache_avro::types::Value::Bytes(value_bytes.clone()),
+                    ),
+                    (
+                        "_MAX_VALUES".to_string(),
+                        apache_avro::types::Value::Bytes(value_bytes.clone()),
+                    ),
+                    (
+                        "_NULL_COUNTS".to_string(),
+                        apache_avro::types::Value::Union(
+                            0,
+                            Box::new(apache_avro::types::Value::Null),
+                        ),
+                    ),
+                ])),
+            ),
+        );
+        let value = apache_avro::types::Value::Union(
+            1,
+            Box::new(apache_avro::types::Value::Record(
+                record.into_iter().collect(),
+            )),
+        );
+        let resolved = value.resolve(&schema).unwrap();
+        writer.append(resolved).unwrap();
+        let bytes = writer.into_inner().unwrap();
+        file_io
+            .new_output(path)
+            .unwrap()
+            .write(bytes::Bytes::from(bytes))
+            .await
+            .unwrap();
+
+        let decoded = ManifestList::read(&file_io, path).await.unwrap();
+        assert_eq!(decoded.len(), 1);
+        assert_eq!(decoded[0].file_name(), "manifest-legacy");
+        assert_eq!(decoded[0].min_bucket(), None);
+        assert_eq!(decoded[0].max_bucket(), None);
+        assert_eq!(decoded[0].min_level(), None);
+        assert_eq!(decoded[0].max_level(), None);
+    }
 }
diff --git a/crates/paimon/src/spec/objects_file.rs 
b/crates/paimon/src/spec/objects_file.rs
index 9056a2e..81e4377 100644
--- a/crates/paimon/src/spec/objects_file.rs
+++ b/crates/paimon/src/spec/objects_file.rs
@@ -83,6 +83,10 @@ mod tests {
             0,
             BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), 
vec![Some(1)]),
             0,
+            None,
+            None,
+            None,
+            None,
             Some(100),
             Some(199),
         )];
diff --git a/crates/paimon/src/table/table_commit.rs 
b/crates/paimon/src/table/table_commit.rs
index 664b654..274dd96 100644
--- a/crates/paimon/src/table/table_commit.rs
+++ b/crates/paimon/src/table/table_commit.rs
@@ -520,11 +520,24 @@ impl TableCommit {
 
         let mut added_file_count: i64 = 0;
         let mut deleted_file_count: i64 = 0;
+        // Bucket / level pruning stats; left as None when entries is empty so 
back-compat
+        // readers (Java < apache/paimon#5345 or older Rust writers) see the 
same shape
+        // they would for a pre-feature manifest.
+        let mut min_bucket: Option<i32> = None;
+        let mut max_bucket: Option<i32> = None;
+        let mut min_level: Option<i32> = None;
+        let mut max_level: Option<i32> = None;
         for entry in entries {
             match entry.kind() {
                 FileKind::Add => added_file_count += 1,
                 FileKind::Delete => deleted_file_count += 1,
             }
+            let b = entry.bucket();
+            min_bucket = Some(min_bucket.map_or(b, |cur| cur.min(b)));
+            max_bucket = Some(max_bucket.map_or(b, |cur| cur.max(b)));
+            let l = entry.file().level;
+            min_level = Some(min_level.map_or(l, |cur| cur.min(l)));
+            max_level = Some(max_level.map_or(l, |cur| cur.max(l)));
         }
 
         // Get file size
@@ -539,7 +552,8 @@ impl TableCommit {
             deleted_file_count,
             partition_stats,
             self.table.schema().id(),
-        ))
+        )
+        .with_bucket_level_stats(min_bucket, max_bucket, min_level, max_level))
     }
 
     /// Check if this commit was already completed (idempotency).
@@ -1822,4 +1836,44 @@ mod tests {
             "Expected 'Delete conflict' error, got: {err_msg}"
         );
     }
+
+    /// `write_manifest_file` must aggregate min/max bucket and level across 
entries so the
+    /// Java reader can prune manifests by bucket / level (see 
apache/paimon#5345). This
+    /// drives a real commit so all the call-site plumbing is exercised end to 
end.
+    #[tokio::test]
+    async fn test_commit_writes_bucket_and_level_stats_into_manifest_list() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_commit_bucket_level_stats";
+        setup_dirs(&file_io, table_path).await;
+
+        let commit = setup_commit(&file_io, table_path);
+
+        fn data_file_at_level(name: &str, level: i32) -> DataFileMeta {
+            let mut f = test_data_file(name, 1);
+            f.level = level;
+            f
+        }
+
+        // Two commit messages on different buckets, each carrying a file at a 
different
+        // level. Expected aggregate: bucket [0, 3], level [0, 2].
+        let messages = vec![
+            CommitMessage::new(vec![], 0, 
vec![data_file_at_level("data-b0.parquet", 0)]),
+            CommitMessage::new(vec![], 3, 
vec![data_file_at_level("data-b3.parquet", 2)]),
+        ];
+        commit.commit(messages).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        let delta_path = format!("{table_path}/manifest/{}", 
snapshot.delta_manifest_list());
+        let metas = ManifestList::read(&file_io, &delta_path).await.unwrap();
+        assert_eq!(
+            metas.len(),
+            1,
+            "expected a single manifest covering both entries"
+        );
+        assert_eq!(metas[0].min_bucket(), Some(0));
+        assert_eq!(metas[0].max_bucket(), Some(3));
+        assert_eq!(metas[0].min_level(), Some(0));
+        assert_eq!(metas[0].max_level(), Some(2));
+    }
 }

Reply via email to