This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 27f574a feat: add bucket and level statistics to ManifestFileMeta
(#350)
27f574a is described below
commit 27f574a984f9b6e2136c523d77779815cb2527a1
Author: shyjsarah <[email protected]>
AuthorDate: Tue Jun 2 09:23:16 2026 +0800
feat: add bucket and level statistics to ManifestFileMeta (#350)
---
.../src/spec/avro/manifest_file_meta_decode.rs | 22 ++++
crates/paimon/src/spec/manifest_file_meta.rs | 99 ++++++++++++++
crates/paimon/src/spec/manifest_list.rs | 144 +++++++++++++++++++++
crates/paimon/src/spec/objects_file.rs | 4 +
crates/paimon/src/table/table_commit.rs | 56 +++++++-
5 files changed, 324 insertions(+), 1 deletion(-)
diff --git a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
index 67d4f2e..7b30c52 100644
--- a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
+++ b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
@@ -33,6 +33,10 @@ impl AvroRecordDecode for ManifestFileMeta {
let mut num_deleted_files: Option<i64> = None;
let mut partition_stats: Option<BinaryTableStats> = None;
let mut schema_id: Option<i64> = None;
+ let mut min_bucket: Option<i32> = None;
+ let mut max_bucket: Option<i32> = None;
+ let mut min_level: Option<i32> = None;
+ let mut max_level: Option<i32> = None;
let mut min_row_id: Option<i64> = None;
let mut max_row_id: Option<i64> = None;
@@ -52,6 +56,10 @@ impl AvroRecordDecode for ManifestFileMeta {
decode_nullable_binary_table_stats(cursor,
&field.schema, field.nullable)?;
}
"_SCHEMA_ID" => schema_id = Some(read_long_field(cursor,
field.nullable)?),
+ "_MIN_BUCKET" => min_bucket = read_optional_int(cursor,
field.nullable)?,
+ "_MAX_BUCKET" => max_bucket = read_optional_int(cursor,
field.nullable)?,
+ "_MIN_LEVEL" => min_level = read_optional_int(cursor,
field.nullable)?,
+ "_MAX_LEVEL" => max_level = read_optional_int(cursor,
field.nullable)?,
"_MIN_ROW_ID" => min_row_id = read_optional_long(cursor,
field.nullable)?,
"_MAX_ROW_ID" => max_row_id = read_optional_long(cursor,
field.nullable)?,
_ => skip_nullable_field(cursor, &field.schema,
field.nullable)?,
@@ -66,12 +74,26 @@ impl AvroRecordDecode for ManifestFileMeta {
num_deleted_files.unwrap_or(0),
partition_stats.unwrap_or_else(|| BinaryTableStats::new(vec![],
vec![], vec![])),
schema_id.unwrap_or(0),
+ min_bucket,
+ max_bucket,
+ min_level,
+ max_level,
min_row_id,
max_row_id,
))
}
}
+fn read_optional_int(cursor: &mut AvroCursor, nullable: bool) ->
crate::Result<Option<i32>> {
+ if nullable {
+ let idx = cursor.read_union_index()?;
+ if idx == 0 {
+ return Ok(None);
+ }
+ }
+ Ok(Some(cursor.read_int()?))
+}
+
fn read_optional_long(cursor: &mut AvroCursor, nullable: bool) ->
crate::Result<Option<i64>> {
if nullable {
let idx = cursor.read_union_index()?;
diff --git a/crates/paimon/src/spec/manifest_file_meta.rs
b/crates/paimon/src/spec/manifest_file_meta.rs
index f0dddb7..51bb81c 100644
--- a/crates/paimon/src/spec/manifest_file_meta.rs
+++ b/crates/paimon/src/spec/manifest_file_meta.rs
@@ -51,6 +51,43 @@ pub struct ManifestFileMeta {
#[serde(rename = "_SCHEMA_ID")]
schema_id: i64,
+ /// minimum bucket covered by entries in this manifest, used by the Java
reader to
+ /// prune manifests that do not overlap a requested bucket. Always `None`
together
+ /// with `max_bucket` for back-compat manifests written before bucket
statistics
+ /// were introduced (apache/paimon#5345).
+ #[serde(
+ rename = "_MIN_BUCKET",
+ default,
+ skip_serializing_if = "Option::is_none"
+ )]
+ min_bucket: Option<i32>,
+
+ /// maximum bucket covered by entries in this manifest. See `min_bucket`.
+ #[serde(
+ rename = "_MAX_BUCKET",
+ default,
+ skip_serializing_if = "Option::is_none"
+ )]
+ max_bucket: Option<i32>,
+
+ /// minimum LSM level covered by entries in this manifest, used by the
Java reader
+ /// for level-based pruning (e.g. compaction's level filter). Same
back-compat note
+ /// as `min_bucket`.
+ #[serde(
+ rename = "_MIN_LEVEL",
+ default,
+ skip_serializing_if = "Option::is_none"
+ )]
+ min_level: Option<i32>,
+
+ /// maximum LSM level covered by entries in this manifest. See `min_level`.
+ #[serde(
+ rename = "_MAX_LEVEL",
+ default,
+ skip_serializing_if = "Option::is_none"
+ )]
+ max_level: Option<i32>,
+
/// minimum row id covered by this manifest, when row tracking is enabled.
#[serde(
rename = "_MIN_ROW_ID",
@@ -110,6 +147,31 @@ impl ManifestFileMeta {
self.version
}
+ /// Get the minimum bucket covered by this manifest (None when bucket
stats are absent,
+ /// e.g. manifests written before the field was introduced).
+ #[inline]
+ pub fn min_bucket(&self) -> Option<i32> {
+ self.min_bucket
+ }
+
+ /// Get the maximum bucket covered by this manifest (None when bucket
stats are absent).
+ #[inline]
+ pub fn max_bucket(&self) -> Option<i32> {
+ self.max_bucket
+ }
+
+ /// Get the minimum LSM level covered by this manifest (None when level
stats are absent).
+ #[inline]
+ pub fn min_level(&self) -> Option<i32> {
+ self.min_level
+ }
+
+ /// Get the maximum LSM level covered by this manifest (None when level
stats are absent).
+ #[inline]
+ pub fn max_level(&self) -> Option<i32> {
+ self.max_level
+ }
+
/// Get the minimum row id covered by this manifest (None when row
tracking is disabled).
#[inline]
pub fn min_row_id(&self) -> Option<i64> {
@@ -122,6 +184,27 @@ impl ManifestFileMeta {
self.max_row_id
}
+ /// Attach bucket / level statistics aggregated from manifest entries.
+ ///
+ /// Use this in writers that have access to the entries that the manifest
covers.
+ /// Setting all four to `None` is equivalent to leaving the stats absent
(the Java
+ /// reader treats `null` here as "no information; do not prune").
+ #[inline]
+ #[must_use]
+ pub fn with_bucket_level_stats(
+ mut self,
+ min_bucket: Option<i32>,
+ max_bucket: Option<i32>,
+ min_level: Option<i32>,
+ max_level: Option<i32>,
+ ) -> Self {
+ self.min_bucket = min_bucket;
+ self.max_bucket = max_bucket;
+ self.min_level = min_level;
+ self.max_level = max_level;
+ self
+ }
+
#[inline]
pub fn new(
file_name: String,
@@ -139,6 +222,10 @@ impl ManifestFileMeta {
num_deleted_files,
partition_stats,
schema_id,
+ min_bucket: None,
+ max_bucket: None,
+ min_level: None,
+ max_level: None,
min_row_id: None,
max_row_id: None,
}
@@ -154,6 +241,10 @@ impl ManifestFileMeta {
num_deleted_files: i64,
partition_stats: BinaryTableStats,
schema_id: i64,
+ min_bucket: Option<i32>,
+ max_bucket: Option<i32>,
+ min_level: Option<i32>,
+ max_level: Option<i32>,
min_row_id: Option<i64>,
max_row_id: Option<i64>,
) -> ManifestFileMeta {
@@ -165,6 +256,10 @@ impl ManifestFileMeta {
num_deleted_files,
partition_stats,
schema_id,
+ min_bucket,
+ max_bucket,
+ min_level,
+ max_level,
min_row_id,
max_row_id,
}
@@ -192,6 +287,10 @@ pub const MANIFEST_FILE_META_SCHEMA: &str = r#"["null", {
]
}], "default": null},
{"name": "_SCHEMA_ID", "type": "long"},
+ {"name": "_MIN_BUCKET", "type": ["null", "int"], "default": null},
+ {"name": "_MAX_BUCKET", "type": ["null", "int"], "default": null},
+ {"name": "_MIN_LEVEL", "type": ["null", "int"], "default": null},
+ {"name": "_MAX_LEVEL", "type": ["null", "int"], "default": null},
{"name": "_MIN_ROW_ID", "type": ["null", "long"], "default": null},
{"name": "_MAX_ROW_ID", "type": ["null", "long"], "default": null}
]
diff --git a/crates/paimon/src/spec/manifest_list.rs
b/crates/paimon/src/spec/manifest_list.rs
index 16eb049..cf3985c 100644
--- a/crates/paimon/src/spec/manifest_list.rs
+++ b/crates/paimon/src/spec/manifest_list.rs
@@ -112,4 +112,148 @@ mod tests {
let decoded = ManifestList::read(&file_io, path).await.unwrap();
assert!(decoded.is_empty());
}
+
+ /// Round-trip bucket / level statistics through Avro so future schema
drift is caught
+ /// here, not in production. Matches the fields added in
apache/paimon#5345.
+ #[tokio::test]
+ async fn test_manifest_list_roundtrip_preserves_bucket_level_stats() {
+ let file_io = test_file_io();
+ let path = "memory:/test_manifest_list_bucket_level/manifest-list-0";
+ file_io
+ .mkdirs("memory:/test_manifest_list_bucket_level/")
+ .await
+ .unwrap();
+
+ let value_bytes = vec![
+ 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0,
0, 0, 0, 0, 0, 129,
+ ];
+ let original = vec![ManifestFileMeta::new(
+ "manifest-bucket-level".to_string(),
+ 4096,
+ 3,
+ 0,
+ BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(),
vec![Some(0)]),
+ 0,
+ )
+ .with_bucket_level_stats(Some(-1), Some(7), Some(0), Some(5))];
+
+ ManifestList::write(&file_io, path, &original)
+ .await
+ .unwrap();
+ let decoded = ManifestList::read(&file_io, path).await.unwrap();
+ assert_eq!(decoded.len(), 1);
+ assert_eq!(decoded[0].min_bucket(), Some(-1));
+ assert_eq!(decoded[0].max_bucket(), Some(7));
+ assert_eq!(decoded[0].min_level(), Some(0));
+ assert_eq!(decoded[0].max_level(), Some(5));
+ // Sanity: nothing else changed.
+ assert_eq!(decoded[0].file_name(), "manifest-bucket-level");
+ assert_eq!(decoded[0].num_added_files(), 3);
+ }
+
+ /// Back-compat: a manifest list written without the bucket / level fields
(e.g. by an
+ /// older Rust writer or any Java writer pre apache/paimon#5345) must
decode into
+ /// `None` rather than failing or yielding bogus values.
+ #[tokio::test]
+ async fn test_manifest_list_decodes_legacy_without_bucket_level_fields() {
+ use apache_avro::{Codec, Schema, Writer};
+ use std::collections::HashMap;
+
+ let file_io = test_file_io();
+ let path = "memory:/test_manifest_list_legacy/manifest-list-0";
+ file_io
+ .mkdirs("memory:/test_manifest_list_legacy/")
+ .await
+ .unwrap();
+
+ // Avro schema with the pre-5345 shape: no _MIN/_MAX_BUCKET/LEVEL
fields.
+ let legacy_schema = r#"["null", {
+ "type": "record",
+ "name": "record",
+ "namespace": "org.apache.paimon.avro.generated",
+ "fields": [
+ {"name": "_VERSION", "type": "int"},
+ {"name": "_FILE_NAME", "type": "string"},
+ {"name": "_FILE_SIZE", "type": "long"},
+ {"name": "_NUM_ADDED_FILES", "type": "long"},
+ {"name": "_NUM_DELETED_FILES", "type": "long"},
+ {"name": "_PARTITION_STATS", "type": ["null", {
+ "type": "record",
+ "name": "record__PARTITION_STATS",
+ "fields": [
+ {"name": "_MIN_VALUES", "type": "bytes"},
+ {"name": "_MAX_VALUES", "type": "bytes"},
+ {"name": "_NULL_COUNTS", "type": ["null", {"type":
"array", "items": ["null", "long"]}], "default": null}
+ ]
+ }], "default": null}
+ ]
+ }]"#;
+ let schema = Schema::parse_str(legacy_schema).unwrap();
+ let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
+ let value_bytes = vec![0u8; 12];
+ let mut record: HashMap<String, apache_avro::types::Value> =
HashMap::new();
+ record.insert("_VERSION".to_string(),
apache_avro::types::Value::Int(2));
+ record.insert(
+ "_FILE_NAME".to_string(),
+ apache_avro::types::Value::String("manifest-legacy".to_string()),
+ );
+ record.insert(
+ "_FILE_SIZE".to_string(),
+ apache_avro::types::Value::Long(1024),
+ );
+ record.insert(
+ "_NUM_ADDED_FILES".to_string(),
+ apache_avro::types::Value::Long(2),
+ );
+ record.insert(
+ "_NUM_DELETED_FILES".to_string(),
+ apache_avro::types::Value::Long(0),
+ );
+ record.insert(
+ "_PARTITION_STATS".to_string(),
+ apache_avro::types::Value::Union(
+ 1,
+ Box::new(apache_avro::types::Value::Record(vec![
+ (
+ "_MIN_VALUES".to_string(),
+ apache_avro::types::Value::Bytes(value_bytes.clone()),
+ ),
+ (
+ "_MAX_VALUES".to_string(),
+ apache_avro::types::Value::Bytes(value_bytes.clone()),
+ ),
+ (
+ "_NULL_COUNTS".to_string(),
+ apache_avro::types::Value::Union(
+ 0,
+ Box::new(apache_avro::types::Value::Null),
+ ),
+ ),
+ ])),
+ ),
+ );
+ let value = apache_avro::types::Value::Union(
+ 1,
+ Box::new(apache_avro::types::Value::Record(
+ record.into_iter().collect(),
+ )),
+ );
+ let resolved = value.resolve(&schema).unwrap();
+ writer.append(resolved).unwrap();
+ let bytes = writer.into_inner().unwrap();
+ file_io
+ .new_output(path)
+ .unwrap()
+ .write(bytes::Bytes::from(bytes))
+ .await
+ .unwrap();
+
+ let decoded = ManifestList::read(&file_io, path).await.unwrap();
+ assert_eq!(decoded.len(), 1);
+ assert_eq!(decoded[0].file_name(), "manifest-legacy");
+ assert_eq!(decoded[0].min_bucket(), None);
+ assert_eq!(decoded[0].max_bucket(), None);
+ assert_eq!(decoded[0].min_level(), None);
+ assert_eq!(decoded[0].max_level(), None);
+ }
}
diff --git a/crates/paimon/src/spec/objects_file.rs
b/crates/paimon/src/spec/objects_file.rs
index 9056a2e..81e4377 100644
--- a/crates/paimon/src/spec/objects_file.rs
+++ b/crates/paimon/src/spec/objects_file.rs
@@ -83,6 +83,10 @@ mod tests {
0,
BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(),
vec![Some(1)]),
0,
+ None,
+ None,
+ None,
+ None,
Some(100),
Some(199),
)];
diff --git a/crates/paimon/src/table/table_commit.rs
b/crates/paimon/src/table/table_commit.rs
index 664b654..274dd96 100644
--- a/crates/paimon/src/table/table_commit.rs
+++ b/crates/paimon/src/table/table_commit.rs
@@ -520,11 +520,24 @@ impl TableCommit {
let mut added_file_count: i64 = 0;
let mut deleted_file_count: i64 = 0;
+ // Bucket / level pruning stats; left as None when entries is empty so
back-compat
+ // readers (Java < apache/paimon#5345 or older Rust writers) see the
same shape
+ // they would for a pre-feature manifest.
+ let mut min_bucket: Option<i32> = None;
+ let mut max_bucket: Option<i32> = None;
+ let mut min_level: Option<i32> = None;
+ let mut max_level: Option<i32> = None;
for entry in entries {
match entry.kind() {
FileKind::Add => added_file_count += 1,
FileKind::Delete => deleted_file_count += 1,
}
+ let b = entry.bucket();
+ min_bucket = Some(min_bucket.map_or(b, |cur| cur.min(b)));
+ max_bucket = Some(max_bucket.map_or(b, |cur| cur.max(b)));
+ let l = entry.file().level;
+ min_level = Some(min_level.map_or(l, |cur| cur.min(l)));
+ max_level = Some(max_level.map_or(l, |cur| cur.max(l)));
}
// Get file size
@@ -539,7 +552,8 @@ impl TableCommit {
deleted_file_count,
partition_stats,
self.table.schema().id(),
- ))
+ )
+ .with_bucket_level_stats(min_bucket, max_bucket, min_level, max_level))
}
/// Check if this commit was already completed (idempotency).
@@ -1822,4 +1836,44 @@ mod tests {
"Expected 'Delete conflict' error, got: {err_msg}"
);
}
+
+ /// `write_manifest_file` must aggregate min/max bucket and level across
entries so the
+ /// Java reader can prune manifests by bucket / level (see
apache/paimon#5345). This
+ /// drives a real commit so all the call-site plumbing is exercised end to
end.
+ #[tokio::test]
+ async fn test_commit_writes_bucket_and_level_stats_into_manifest_list() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_commit_bucket_level_stats";
+ setup_dirs(&file_io, table_path).await;
+
+ let commit = setup_commit(&file_io, table_path);
+
+ fn data_file_at_level(name: &str, level: i32) -> DataFileMeta {
+ let mut f = test_data_file(name, 1);
+ f.level = level;
+ f
+ }
+
+ // Two commit messages on different buckets, each carrying a file at a
different
+ // level. Expected aggregate: bucket [0, 3], level [0, 2].
+ let messages = vec![
+ CommitMessage::new(vec![], 0,
vec![data_file_at_level("data-b0.parquet", 0)]),
+ CommitMessage::new(vec![], 3,
vec![data_file_at_level("data-b3.parquet", 2)]),
+ ];
+ commit.commit(messages).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ let delta_path = format!("{table_path}/manifest/{}",
snapshot.delta_manifest_list());
+ let metas = ManifestList::read(&file_io, &delta_path).await.unwrap();
+ assert_eq!(
+ metas.len(),
+ 1,
+ "expected a single manifest covering both entries"
+ );
+ assert_eq!(metas[0].min_bucket(), Some(0));
+ assert_eq!(metas[0].max_bucket(), Some(3));
+ assert_eq!(metas[0].min_level(), Some(0));
+ assert_eq!(metas[0].max_level(), Some(2));
+ }
}