This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new c9ddccf fix(scan): net manifest add/delete by full file identity
incl. level (#377)
c9ddccf is described below
commit c9ddccf3e04d776004c18d2475154be3d63c984f
Author: chaoyang <[email protected]>
AuthorDate: Thu Jun 11 15:38:31 2026 +0800
fix(scan): net manifest add/delete by full file identity incl. level (#377)
merge_manifest_entries cancelled add/delete pairs by
(partition, bucket, file_name), omitting level. A single-run compaction
upgrades a file in place (DELETE f@oldLevel + ADD f@newLevel, same name),
so the delete also cancelled the upgraded add, dropping the live file from
the scan and silently losing its rows on read.
Rewrite it to mirror Java AbstractFileStoreScan.readAndMergeFileEntries:
collect every DELETE's full Identifier (incl. level), then keep ADDs whose
identifier is not deleted. Also align Identifier's Hash with its Eq (and
Java FileEntry.Identifier.hashCode) so in-place upgraded files no longer
all collide in the dedup set.
---
crates/paimon/src/spec/manifest_entry.rs | 52 ++++++++++++++++++++
crates/paimon/src/table/table_scan.rs | 84 +++++++++++++++++++++-----------
2 files changed, 108 insertions(+), 28 deletions(-)
diff --git a/crates/paimon/src/spec/manifest_entry.rs
b/crates/paimon/src/spec/manifest_entry.rs
index 1d02fb7..194fc31 100644
--- a/crates/paimon/src/spec/manifest_entry.rs
+++ b/crates/paimon/src/spec/manifest_entry.rs
@@ -35,10 +35,19 @@ pub struct Identifier {
}
impl Hash for Identifier {
+ // Hash every field that participates in `Eq`, matching Java
+ // `FileEntry.Identifier.hashCode`. `level` in particular must be included:
+ // a single-run compaction upgrades a file in place (same name, new level),
+ // producing distinct identifiers that would otherwise all collide and
+ // degrade the hash sets used to net add/delete entries during scan.
fn hash<H: Hasher>(&self, state: &mut H) {
self.partition.hash(state);
self.bucket.hash(state);
+ self.level.hash(state);
self.file_name.hash(state);
+ self.extra_files.hash(state);
+ self.embedded_index.hash(state);
+ self.external_path.hash(state);
}
}
@@ -226,3 +235,46 @@ pub const MANIFEST_ENTRY_SCHEMA: &str = r#"["null", {
{"name": "_VERSION", "type": "int"}
]
}]"#;
+
+#[cfg(test)]
+mod tests {
+ use super::Identifier;
+ use std::collections::HashSet;
+
+ fn ident(file_name: &str, level: i32) -> Identifier {
+ Identifier {
+ partition: vec![1, 2, 3],
+ bucket: 0,
+ level,
+ file_name: file_name.to_string(),
+ extra_files: Vec::new(),
+ embedded_index: None,
+ external_path: None,
+ }
+ }
+
+ /// A single-run compaction upgrades a file in place: same name, new level.
+ /// The two identifiers must be distinct (Eq) and must not collapse
together
+ /// in a HashSet (Hash), otherwise netting add/delete by identifier would
+ /// drop the upgraded file. Guards both the `Eq` and the `Hash` impl.
+ #[test]
+ fn test_identifier_distinguishes_in_place_level_upgrade() {
+ let l0 = ident("f.parquet", 0);
+ let l5 = ident("f.parquet", 5);
+
+ assert_ne!(l0, l5, "same name at different levels are different
files");
+
+ let mut set = HashSet::new();
+ set.insert(l0.clone());
+ assert!(
+ !set.contains(&l5),
+ "upgraded file must not alias the old one"
+ );
+ set.insert(l5.clone());
+ assert_eq!(set.len(), 2, "both levels coexist as distinct
identifiers");
+
+ // Equal identifiers must hash/compare equal (set dedups them).
+ set.insert(ident("f.parquet", 5));
+ assert_eq!(set.len(), 2, "equal identifiers dedup");
+ }
+}
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index be0d2df..273a5c9 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -236,39 +236,32 @@ fn build_deletion_files_map(
map
}
-/// Merges add/delete manifest entries following pypaimon's `adds - deletes`
behavior.
+/// Nets add/delete manifest entries for a scan, returning only the live ADD
set.
///
-/// The identifier must be rich enough to match Paimon's file identity,
otherwise a delete
-/// for one file version can incorrectly remove another with the same file
name.
+/// Mirrors Java `AbstractFileStoreScan.readAndMergeFileEntries`: first collect
+/// the full [`Identifier`] of every DELETE entry, then keep the ADD entries
+/// whose identifier is not in that set. The identity is the complete Paimon
file
+/// identity (`partition, bucket, level, file_name, extra_files,
embedded_index,
+/// external_path`, matching Java `FileEntry.Identifier`).
+///
+/// Keying on file name alone is wrong: a single-run compaction upgrades a file
+/// *in place* — `DELETE f@oldLevel` plus `ADD f@newLevel` with the same file
+/// name (`PojoDataFileMeta.upgrade` reuses the name, only changing `level`).
An
+/// identity without `level` lets the DELETE cancel the upgraded ADD, dropping
+/// the file from the scan and silently losing its rows on read.
+///
+/// Collecting deletes first (rather than insert/remove while iterating) makes
+/// the result independent of ADD/DELETE ordering, matching the Java scan path.
fn merge_manifest_entries(entries: Vec<ManifestEntry>) -> Vec<ManifestEntry> {
- let mut delete_entries = Vec::with_capacity(entries.len() / 4);
- let mut added_entries = Vec::with_capacity(entries.len());
-
- for entry in entries {
- match entry.kind() {
- FileKind::Add => added_entries.push(entry),
- FileKind::Delete => delete_entries.push(entry),
- }
- }
-
- if delete_entries.is_empty() {
- return added_entries;
- }
-
- let deleted_keys: HashSet<(&[u8], i32, &str)> = delete_entries
+ use crate::spec::Identifier;
+ let deleted: HashSet<Identifier> = entries
.iter()
- .map(|e| (e.partition(), e.bucket(), e.file().file_name.as_str()))
+ .filter(|e| *e.kind() == FileKind::Delete)
+ .map(|e| e.identifier())
.collect();
-
- added_entries
+ entries
.into_iter()
- .filter(|entry| {
- !deleted_keys.contains(&(
- entry.partition(),
- entry.bucket(),
- entry.file().file_name.as_str(),
- ))
- })
+ .filter(|e| *e.kind() == FileKind::Add &&
!deleted.contains(&e.identifier()))
.collect()
}
@@ -829,6 +822,41 @@ mod tests {
}
}
+ #[test]
+ fn test_merge_manifest_entries_keeps_in_place_upgraded_file() {
+ // Reproduces a single-run compaction "upgrade": the SAME file name is
+ // deleted at level 0 and re-added at a higher level (Paimon promotes a
+ // lone sorted run in place instead of rewriting it). Netting
ADD/DELETE
+ // by file name alone (ignoring `level`) wrongly drops the upgraded
file.
+ // Matches Java `FileEntry.Identifier`, which includes `level`.
+ use super::merge_manifest_entries;
+ use crate::spec::ManifestEntry;
+
+ let entry = |kind: FileKind, name: &str, level: i32| -> ManifestEntry {
+ let mut file = make_evo_file(name, 1, 1, 1, None);
+ file.level = level;
+ ManifestEntry::new(kind, Vec::new(), 0, 1, file, 2)
+ };
+
+ let entries = vec![
+ entry(FileKind::Add, "f.parquet", 0), // original level-0 write
+ entry(FileKind::Delete, "f.parquet", 0), // compaction removes the
L0 version
+ entry(FileKind::Add, "f.parquet", 5), // same file upgraded to
level 5
+ entry(FileKind::Add, "g.parquet", 0), // unrelated fresh file
+ ];
+
+ let mut live: Vec<(String, i32)> = merge_manifest_entries(entries)
+ .into_iter()
+ .map(|e| (e.file().file_name.clone(), e.file().level))
+ .collect();
+ live.sort();
+ assert_eq!(
+ live,
+ vec![("f.parquet".to_string(), 5), ("g.parquet".to_string(), 0)],
+ "upgraded file (f@L5) must survive; only f@L0 is cancelled by the
DELETE"
+ );
+ }
+
fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
groups
.iter()