jerry-024 commented on code in PR #254:
URL: https://github.com/apache/paimon-rust/pull/254#discussion_r3098158251


##########
crates/paimon/src/table/table_scan.rs:
##########
@@ -474,25 +474,24 @@ impl<'a> TableScan<'a> {
         let file_io = self.table.file_io();
         let table_path = self.table.location();
         let core_options = CoreOptions::new(self.table.schema().options());
-        let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
         let data_evolution_enabled = core_options.data_evolution_enabled();
 
-        let has_primary_keys = !self.table.schema().primary_keys().is_empty();
+        let has_primary_keys = 
!self.table.schema().trimmed_primary_keys().is_empty();
 
-        // Skip level-0 files for PK tables when:
-        // - DV mode: level-0 files are unmerged, DV handles dedup at higher 
levels
-        // - FirstRow engine without DV: reads go through DataFileReader (no 
merge),
-        //   so only compacted (level > 0) files are safe to read directly
+        // Skip level-0 files for PK tables with FirstRow engine only.
+        // FirstRow reads go through DataFileReader (no merge), so only 
compacted
+        // (level > 0) files are safe to read directly.
+        // Deduplicate engine always uses KeyValueFileReader which handles 
level-0
+        // via sort-merge, so level-0 files must remain visible.
         //
         // Non-read paths (overwrite, truncate, writer restore) set 
scan_all_files=true
         // to see all files including level-0, matching Java's CommitScanner 
behavior.
         let skip_level_zero = if self.scan_all_files {
             false
         } else if has_primary_keys {
-            deletion_vectors_enabled
-                || core_options
-                    .merge_engine()
-                    .is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow)
+            core_options
+                .merge_engine()
+                .is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow)

Review Comment:
   **Must-fix (P1, read regression): DV guard removed.**
   
   Previous `skip_level_zero` covered both `deletion_vectors_enabled` and 
`FirstRow`. After this PR, DV mode no longer skips level-0, so splits with 
level-0 files stay visible.
   
   `table_read.rs:3832-3838` routes PK + Deduplicate + any level-0 split into 
`read_kv()` → `KeyValueFileReader`, and `kv_file_reader.rs:250-259` explicitly 
errors when `data_deletion_files` contains any `Some`:
   
   ```rust
   Err(Error::UnexpectedError {
       message: "KeyValueFileReader does not support deletion 
vectors".to_string(),
       ...
   })
   ```
   
   So DV + Deduplicate reads now hard-fail once any level-0 file exists, not 
just return stale rows. Options:
   
   1. Restore the DV guard here (and in `table_read.rs`): keep DV tables on the 
`DataFileReader` / raw path.
   2. Or teach `KeyValueFileReader` to apply DV (pass deletion files through 
`KeyValueReadConfig`).
   
   Option 1 is the smaller change if DV + Deduplicate is not yet a design goal. 
Either way, please add a DV + Deduplicate + level-0 read test.



##########
crates/paimon/src/table/table_write.rs:
##########
@@ -125,26 +127,33 @@ impl TableWrite {
         }
 
         let total_buckets = core_options.bucket();
-        let has_primary_keys = !schema.primary_keys().is_empty();
-
-        if has_primary_keys {
-            if total_buckets < 1 && total_buckets != POSTPONE_BUCKET {
-                return Err(crate::Error::Unsupported {
-                    message: format!(
-                        "KeyValueFileWriter does not support 
bucket={total_buckets}, only fixed bucket (>= 1) or postpone bucket (= -2) is 
supported"
-                    ),
-                });
-            }
-            if total_buckets != POSTPONE_BUCKET
-                && core_options
-                    .changelog_producer()
-                    .eq_ignore_ascii_case("input")
-            {
-                return Err(crate::Error::Unsupported {
-                    message: "KeyValueFileWriter does not support 
changelog-producer=input"
-                        .to_string(),
-                });
-            }
+        let has_primary_keys = !schema.trimmed_primary_keys().is_empty();
+        let is_dynamic_bucket = has_primary_keys && total_buckets == -1;
+
+        let is_cross_partition = is_dynamic_bucket
+            && !schema.partition_keys().is_empty()
+            && schema.trimmed_primary_keys().len() == 
schema.primary_keys().len();

Review Comment:
   **Must-fix (P1, stale data): cross-partition detection is too narrow.**
   
   `trimmed_primary_keys().len() == primary_keys().len()` is equivalent to "PK 
and partition columns do not overlap at all". Java Paimon triggers 
cross-partition update whenever `!primaryKeys.containsAll(partitionKeys)`, i.e. 
any partition column outside the PK.
   
   Counter-example that this branch misses: `PARTITIONED BY (pt1, pt2) + 
PRIMARY KEY (pt1, id)`. Here `trimmed=[id]` (len 1), `primary_keys=[pt1, id]` 
(len 2), so `1 != 2` → routed to `DynamicBucketAssigner`. When a record moves 
across `pt2` values, the PK lookup in the new partition does not see the old 
row, no old-partition DELETE is emitted, and the old partition keeps stale data 
forever.
   
   Suggest:
   
   ```rust
   let pk_set: HashSet<_> = schema.primary_keys().iter().collect();
   let partition_not_in_pk = schema.partition_keys().iter().any(|p| 
!pk_set.contains(p));
   let is_cross_partition = is_dynamic_bucket && partition_not_in_pk;
   ```
   
   Please add a regression test: `PARTITIONED BY (pt1, pt2) + PK (pt1, id)`, 
write same `id` under different `pt2`, assert old partition has a DELETE record 
(or assert post-commit row count is 1).



##########
crates/paimon/src/table/table_write.rs:
##########
@@ -125,26 +127,33 @@ impl TableWrite {
         }
 
         let total_buckets = core_options.bucket();
-        let has_primary_keys = !schema.primary_keys().is_empty();
-
-        if has_primary_keys {
-            if total_buckets < 1 && total_buckets != POSTPONE_BUCKET {
-                return Err(crate::Error::Unsupported {
-                    message: format!(
-                        "KeyValueFileWriter does not support 
bucket={total_buckets}, only fixed bucket (>= 1) or postpone bucket (= -2) is 
supported"
-                    ),
-                });
-            }
-            if total_buckets != POSTPONE_BUCKET
-                && core_options
-                    .changelog_producer()
-                    .eq_ignore_ascii_case("input")
-            {
-                return Err(crate::Error::Unsupported {
-                    message: "KeyValueFileWriter does not support 
changelog-producer=input"
-                        .to_string(),
-                });
-            }
+        let has_primary_keys = !schema.trimmed_primary_keys().is_empty();

Review Comment:
   **Must-fix (P1): `has_primary_keys` misjudgment for `PK ⊆ partition_keys`.**
   
   Switching the definition to `trimmed_primary_keys()` causes tables where `PK 
== partition_keys` (a legal schema in Paimon — `partition_keys ⊆ primary_keys` 
is the only constraint, and equality is allowed) to be treated as append-only. 
Because `primary_key_indices` at line 192 is also derived from 
`trimmed_primary_keys()`, `create_writer` at line 521 sees 
`self.primary_key_indices.is_empty()` and picks `create_append_writer`. PK 
semantics (dedup, merge) are silently dropped.
   
   Suggest separating the two concepts:
   
   ```rust
   let has_primary_keys = !schema.primary_keys().is_empty();
   // keep trimmed_primary_keys() only for building KV key indices below
   ```
   
   Please add a regression test for `PK == partition_keys` writes: insert 
duplicates within the same partition, assert the table dedupes.



##########
crates/paimon/src/spec/schema.rs:
##########
@@ -98,6 +98,22 @@ impl TableSchema {
         &self.primary_keys
     }
 
+    /// Primary keys with partition columns removed.
+    ///
+    /// Within a single partition the partition columns are constant, so they
+    /// are redundant in the KV key. Java Paimon calls these "trimmed primary 
keys".
+    pub fn trimmed_primary_keys(&self) -> Vec<String> {
+        if self.partition_keys.is_empty() {
+            return self.primary_keys.clone();
+        }
+        let partition_set: HashSet<&str> = 
self.partition_keys.iter().map(String::as_str).collect();
+        self.primary_keys
+            .iter()
+            .filter(|pk| !partition_set.contains(pk.as_str()))
+            .cloned()
+            .collect()
+    }

Review Comment:
   **Must-fix (P1, silent data loss).** The real target is 
`TableSchema::bucket_keys()` at line 145 of this file (outside this hunk, so 
leaving the comment here).
   
   `bucket_keys()` still returns `self.primary_keys.clone()` when no explicit 
bucket key is set, but the read path added in this PR falls back to 
`trimmed_primary_keys()` in two places:
   - `crates/paimon/src/table/read_builder.rs:57-63` (`bucket_predicate`)
   - `crates/paimon/src/table/table_scan.rs:511-517` (`bucket_key_fields`)
   
   On a fixed-bucket partitioned PK table (`PARTITIONED BY (pt) + PK (pt, id)`, 
default bucket key), writes compute `hash([pt, id]) % N`, but a query `WHERE 
pt='a' AND id=5` computes the target bucket via `hash([id]) % N`. The target 
set at `table_scan.rs:181-190` then prunes the real bucket and the query 
silently returns zero rows.
   
   Java Paimon's `TableSchema.bucketKeys()` uses `trimmedPrimaryKeys()` in this 
fallback. Suggest changing line 145 to:
   
   ```rust
   if !self.primary_keys.is_empty() {
       return self.trimmed_primary_keys();
   }
   ```
   
   (Note: may return empty when `PK == partition_keys`; `FixedBucketAssigner` 
construction in `table_write.rs` already falls through to the constant-bucket-0 
branch when `bucket_key_indices.is_empty()`, so that case stays covered.)
   
   Please add a regression test: partitioned PK fixed-bucket table, insert 
data, query with both partition and PK columns in the predicate, assert row 
count > 0.



##########
crates/paimon/src/spec/index_manifest.rs:
##########
@@ -25,10 +25,45 @@ use std::fmt::{Display, Formatter};
 
 use crate::Result;
 
+/// Avro schema for IndexManifestEntry OCF serialization.
+///
+/// Must match the serde layout of `IndexManifestEntry`.
+pub const INDEX_MANIFEST_ENTRY_SCHEMA: &str = r#"{
+    "type": "record",
+    "name": "org.apache.paimon.avro.generated.record",
+    "fields": [
+        {"name": "_VERSION", "type": "int"},
+        {"name": "_KIND", "type": "int"},
+        {"name": "_PARTITION", "type": "bytes"},
+        {"name": "_BUCKET", "type": "int"},
+        {"name": "_INDEX_TYPE", "type": "string"},
+        {"name": "_FILE_NAME", "type": "string"},
+        {"name": "_FILE_SIZE", "type": "long"},
+        {"name": "_ROW_COUNT", "type": "long"},

Review Comment:
   **Must-fix (P1): Avro schema is missing `_GLOBAL_INDEX`.**
   
   `IndexFileMeta` has a `global_index_meta: Option<GlobalIndexMeta>` field 
(with `skip_serializing_if = Option::is_none`), but this schema declares no 
`_GLOBAL_INDEX` field.
   
   For HASH-only entries this is fine (the field is always `None`). The problem 
is the new merge path in `table_commit.rs::write_index_manifest`: on 
APPEND/OVERWRITE it reads the previous index manifest, retains entries, and 
rewrites the merged list through `IndexManifest::write` with this schema. Any 
pre-existing entry carrying a non-null `global_index_meta` (e.g. DV, global 
index) either fails to re-serialize (depending on writer strictness) or 
silently drops the field on round-trip, corrupting the manifest.
   
   Suggest adding the field as optional, mirroring Java's 
`IndexFileMeta.schema()`:
   
   ```json
   {
     "name": "_GLOBAL_INDEX",
     "type": ["null", { ...GlobalIndexMeta record... }],
     "default": null
   }
   ```
   
   Nit: the `_FILE_SIZE` / `_ROW_COUNT` declared as `long` here while the Rust 
struct fields are `i32` does work — the `test_single_object_serde` test at line 
169 already exercises this round-trip — but it's worth a short comment 
explaining that `serde_avro_fast` coerces across widths, otherwise it reads as 
a bug.



##########
crates/paimon/src/table/table_write.rs:
##########
@@ -291,64 +334,118 @@ impl TableWrite {
     }
 
     /// Group rows by (partition_bytes, bucket) and return sub-batches.
-    fn divide_by_partition_bucket(
-        &self,
+    ///
+    /// In cross-partition mode, also generates DELETE sub-batches for keys 
that
+    /// migrated from one partition to another.
+    async fn divide_by_partition_bucket(
+        &mut self,
         batch: &RecordBatch,
     ) -> Result<Vec<(PartitionBucketKey, RecordBatch)>> {
-        // Fast path: no partitions and single bucket — skip per-row routing
-        if self.partition_field_indices.is_empty() && self.total_buckets <= 1 {
-            let bucket = if self.total_buckets == POSTPONE_BUCKET {
-                POSTPONE_BUCKET
-            } else {
-                0
-            };
-            return Ok(vec![(
-                (EMPTY_SERIALIZED_ROW.clone(), bucket),
-                batch.clone(),
-            )]);
+        // Fast path: constant bucket with no partitions — skip per-row routing
+        if let BucketAssignerEnum::Constant(ref a) = self.bucket_assigner {
+            if self.partition_keys.is_empty() {
+                return Ok(vec![(
+                    (EMPTY_SERIALIZED_ROW.clone(), a.bucket()),
+                    batch.clone(),
+                )]);
+            }
         }
 
-        let fields = self.table.schema().fields();
-        let mut groups: HashMap<PartitionBucketKey, Vec<usize>> = 
HashMap::new();
+        let fields = self.table.schema().fields().to_vec();
+        let output = self.bucket_assigner.assign_batch(batch, &fields).await?;
 
+        let mut groups: HashMap<PartitionBucketKey, Vec<usize>> = 
HashMap::new();
+        let skip_set: HashSet<usize> = output.skips.into_iter().collect();
         for row_idx in 0..batch.num_rows() {
-            let (partition_bytes, bucket) =
-                self.extract_partition_bucket(batch, row_idx, fields)?;
+            if skip_set.contains(&row_idx) {
+                continue;
+            }
             groups
-                .entry((partition_bytes, bucket))
+                .entry((
+                    output.partition_bytes[row_idx].clone(),
+                    output.buckets[row_idx],
+                ))
                 .or_default()
                 .push(row_idx);
         }
 
         let mut result = Vec::with_capacity(groups.len());
+        let has_deletes = !output.deletes.is_empty();
         for (key, row_indices) in groups {
-            let sub_batch = if row_indices.len() == batch.num_rows() {
-                batch.clone()
+            let sub_batch = Self::take_rows(batch, &row_indices)?;
+            let sub_batch = if has_deletes {
+                Self::add_value_kind_column(&sub_batch, 0)?

Review Comment:
   **Must-fix (P1): `_VALUE_KIND` column is added only when `has_deletes=true`, 
causing intra-writer schema drift.**
   
   `KeyValueFileWriter` buffers batches and later calls `concat_batches`, which 
requires identical Arrow schemas across all accumulated batches. This code path 
only appends `_VALUE_KIND` when the current invocation produced cross-partition 
deletes. If the same writer instance first receives a batch with no deletes 
(schema without `_VALUE_KIND`), then later a batch containing deletes (schema 
with `_VALUE_KIND`), the later concat fails with a schema mismatch error.
   
   Suggest one of:
   - Always add `_VALUE_KIND=0` to inserts in cross-partition-capable writers, 
keeping the writer schema stable across batches.
   - Or have `KeyValueFileWriter` own the `_VALUE_KIND` column (inject it on 
ingestion), so `TableWrite` does not need to special-case this.
   
   Please add a regression test: single `TableWrite` on a cross-partition 
table, first `write_arrow_batch` with no migrated keys, then second 
`write_arrow_batch` that migrates keys, both followed by a successful 
`prepare_commit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to