This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new c9ddccf  fix(scan): net manifest add/delete by full file identity 
incl. level (#377)
c9ddccf is described below

commit c9ddccf3e04d776004c18d2475154be3d63c984f
Author: chaoyang <[email protected]>
AuthorDate: Thu Jun 11 15:38:31 2026 +0800

    fix(scan): net manifest add/delete by full file identity incl. level (#377)
    
    merge_manifest_entries cancelled add/delete pairs by
    (partition, bucket, file_name), omitting level. A single-run compaction
    upgrades a file in place (DELETE f@oldLevel + ADD f@newLevel, same name),
    so the delete also cancelled the upgraded add, dropping the live file from
    the scan and silently losing its rows on read.
    
    Rewrite it to mirror Java AbstractFileStoreScan.readAndMergeFileEntries:
    collect every DELETE's full Identifier (incl. level), then keep ADDs whose
    identifier is not deleted. Also align Identifier's Hash with its Eq (and
    Java FileEntry.Identifier.hashCode) so in-place upgraded files no longer
    all collide in the dedup set.
---
 crates/paimon/src/spec/manifest_entry.rs | 52 ++++++++++++++++++++
 crates/paimon/src/table/table_scan.rs    | 84 +++++++++++++++++++++-----------
 2 files changed, 108 insertions(+), 28 deletions(-)

diff --git a/crates/paimon/src/spec/manifest_entry.rs 
b/crates/paimon/src/spec/manifest_entry.rs
index 1d02fb7..194fc31 100644
--- a/crates/paimon/src/spec/manifest_entry.rs
+++ b/crates/paimon/src/spec/manifest_entry.rs
@@ -35,10 +35,19 @@ pub struct Identifier {
 }
 
 impl Hash for Identifier {
+    // Hash every field that participates in `Eq`, matching Java
+    // `FileEntry.Identifier.hashCode`. `level` in particular must be included:
+    // a single-run compaction upgrades a file in place (same name, new level),
+    // producing distinct identifiers that would otherwise all collide and
+    // degrade the hash sets used to net add/delete entries during scan.
     fn hash<H: Hasher>(&self, state: &mut H) {
         self.partition.hash(state);
         self.bucket.hash(state);
+        self.level.hash(state);
         self.file_name.hash(state);
+        self.extra_files.hash(state);
+        self.embedded_index.hash(state);
+        self.external_path.hash(state);
     }
 }
 
@@ -226,3 +235,46 @@ pub const MANIFEST_ENTRY_SCHEMA: &str = r#"["null", {
         {"name": "_VERSION", "type": "int"}
     ]
 }]"#;
+
+#[cfg(test)]
+mod tests {
+    use super::Identifier;
+    use std::collections::HashSet;
+
+    fn ident(file_name: &str, level: i32) -> Identifier {
+        Identifier {
+            partition: vec![1, 2, 3],
+            bucket: 0,
+            level,
+            file_name: file_name.to_string(),
+            extra_files: Vec::new(),
+            embedded_index: None,
+            external_path: None,
+        }
+    }
+
+    /// A single-run compaction upgrades a file in place: same name, new level.
+    /// The two identifiers must be distinct (Eq) and must not collapse 
together
+    /// in a HashSet (Hash), otherwise netting add/delete by identifier would
+    /// drop the upgraded file. Guards both the `Eq` and the `Hash` impl.
+    #[test]
+    fn test_identifier_distinguishes_in_place_level_upgrade() {
+        let l0 = ident("f.parquet", 0);
+        let l5 = ident("f.parquet", 5);
+
+        assert_ne!(l0, l5, "same name at different levels are different 
files");
+
+        let mut set = HashSet::new();
+        set.insert(l0.clone());
+        assert!(
+            !set.contains(&l5),
+            "upgraded file must not alias the old one"
+        );
+        set.insert(l5.clone());
+        assert_eq!(set.len(), 2, "both levels coexist as distinct 
identifiers");
+
+        // Equal identifiers must hash/compare equal (set dedups them).
+        set.insert(ident("f.parquet", 5));
+        assert_eq!(set.len(), 2, "equal identifiers dedup");
+    }
+}
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index be0d2df..273a5c9 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -236,39 +236,32 @@ fn build_deletion_files_map(
     map
 }
 
-/// Merges add/delete manifest entries following pypaimon's `adds - deletes` 
behavior.
+/// Nets add/delete manifest entries for a scan, returning only the live ADD 
set.
 ///
-/// The identifier must be rich enough to match Paimon's file identity, 
otherwise a delete
-/// for one file version can incorrectly remove another with the same file 
name.
+/// Mirrors Java `AbstractFileStoreScan.readAndMergeFileEntries`: first collect
+/// the full [`Identifier`] of every DELETE entry, then keep the ADD entries
+/// whose identifier is not in that set. The identity is the complete Paimon 
file
+/// identity (`partition, bucket, level, file_name, extra_files, 
embedded_index,
+/// external_path`, matching Java `FileEntry.Identifier`).
+///
+/// Keying on file name alone is wrong: a single-run compaction upgrades a file
+/// *in place* — `DELETE f@oldLevel` plus `ADD f@newLevel` with the same file
+/// name (`PojoDataFileMeta.upgrade` reuses the name, only changing `level`). 
An
+/// identity without `level` lets the DELETE cancel the upgraded ADD, dropping
+/// the file from the scan and silently losing its rows on read.
+///
+/// Collecting deletes first (rather than insert/remove while iterating) makes
+/// the result independent of ADD/DELETE ordering, matching the Java scan path.
 fn merge_manifest_entries(entries: Vec<ManifestEntry>) -> Vec<ManifestEntry> {
-    let mut delete_entries = Vec::with_capacity(entries.len() / 4);
-    let mut added_entries = Vec::with_capacity(entries.len());
-
-    for entry in entries {
-        match entry.kind() {
-            FileKind::Add => added_entries.push(entry),
-            FileKind::Delete => delete_entries.push(entry),
-        }
-    }
-
-    if delete_entries.is_empty() {
-        return added_entries;
-    }
-
-    let deleted_keys: HashSet<(&[u8], i32, &str)> = delete_entries
+    use crate::spec::Identifier;
+    let deleted: HashSet<Identifier> = entries
         .iter()
-        .map(|e| (e.partition(), e.bucket(), e.file().file_name.as_str()))
+        .filter(|e| *e.kind() == FileKind::Delete)
+        .map(|e| e.identifier())
         .collect();
-
-    added_entries
+    entries
         .into_iter()
-        .filter(|entry| {
-            !deleted_keys.contains(&(
-                entry.partition(),
-                entry.bucket(),
-                entry.file().file_name.as_str(),
-            ))
-        })
+        .filter(|e| *e.kind() == FileKind::Add && 
!deleted.contains(&e.identifier()))
         .collect()
 }
 
@@ -829,6 +822,41 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_merge_manifest_entries_keeps_in_place_upgraded_file() {
+        // Reproduces a single-run compaction "upgrade": the SAME file name is
+        // deleted at level 0 and re-added at a higher level (Paimon promotes a
+        // lone sorted run in place instead of rewriting it). Netting 
ADD/DELETE
+        // by file name alone (ignoring `level`) wrongly drops the upgraded 
file.
+        // Matches Java `FileEntry.Identifier`, which includes `level`.
+        use super::merge_manifest_entries;
+        use crate::spec::ManifestEntry;
+
+        let entry = |kind: FileKind, name: &str, level: i32| -> ManifestEntry {
+            let mut file = make_evo_file(name, 1, 1, 1, None);
+            file.level = level;
+            ManifestEntry::new(kind, Vec::new(), 0, 1, file, 2)
+        };
+
+        let entries = vec![
+            entry(FileKind::Add, "f.parquet", 0), // original level-0 write
+            entry(FileKind::Delete, "f.parquet", 0), // compaction removes the 
L0 version
+            entry(FileKind::Add, "f.parquet", 5), // same file upgraded to 
level 5
+            entry(FileKind::Add, "g.parquet", 0), // unrelated fresh file
+        ];
+
+        let mut live: Vec<(String, i32)> = merge_manifest_entries(entries)
+            .into_iter()
+            .map(|e| (e.file().file_name.clone(), e.file().level))
+            .collect();
+        live.sort();
+        assert_eq!(
+            live,
+            vec![("f.parquet".to_string(), 5), ("g.parquet".to_string(), 0)],
+            "upgraded file (f@L5) must survive; only f@L0 is cancelled by the 
DELETE"
+        );
+    }
+
     fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
         groups
             .iter()

Reply via email to