jerry-024 commented on code in PR #254:
URL: https://github.com/apache/paimon-rust/pull/254#discussion_r3098158251
##########
crates/paimon/src/table/table_scan.rs:
##########
@@ -474,25 +474,24 @@ impl<'a> TableScan<'a> {
let file_io = self.table.file_io();
let table_path = self.table.location();
let core_options = CoreOptions::new(self.table.schema().options());
- let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
let data_evolution_enabled = core_options.data_evolution_enabled();
- let has_primary_keys = !self.table.schema().primary_keys().is_empty();
+ let has_primary_keys =
!self.table.schema().trimmed_primary_keys().is_empty();
- // Skip level-0 files for PK tables when:
- // - DV mode: level-0 files are unmerged, DV handles dedup at higher
levels
- // - FirstRow engine without DV: reads go through DataFileReader (no
merge),
- // so only compacted (level > 0) files are safe to read directly
+ // Skip level-0 files for PK tables with FirstRow engine only.
+ // FirstRow reads go through DataFileReader (no merge), so only
compacted
+ // (level > 0) files are safe to read directly.
+ // Deduplicate engine always uses KeyValueFileReader which handles
level-0
+ // via sort-merge, so level-0 files must remain visible.
//
// Non-read paths (overwrite, truncate, writer restore) set
scan_all_files=true
// to see all files including level-0, matching Java's CommitScanner
behavior.
let skip_level_zero = if self.scan_all_files {
false
} else if has_primary_keys {
- deletion_vectors_enabled
- || core_options
- .merge_engine()
- .is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow)
+ core_options
+ .merge_engine()
+ .is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow)
Review Comment:
**Must-fix (P1, read regression): DV guard removed.**
Previous `skip_level_zero` covered both `deletion_vectors_enabled` and
`FirstRow`. After this PR, DV mode no longer skips level-0, so splits with
level-0 files stay visible.
`table_read.rs:3832-3838` routes PK + Deduplicate + any level-0 split into
`read_kv()` → `KeyValueFileReader`, and `kv_file_reader.rs:250-259` explicitly
errors when `data_deletion_files` contains any `Some`:
```rust
Err(Error::UnexpectedError {
message: "KeyValueFileReader does not support deletion
vectors".to_string(),
...
})
```
So DV + Deduplicate reads now hard-fail once any level-0 file exists, not
just return stale rows. Options:
1. Restore the DV guard here (and in `table_read.rs`): keep DV tables on the
`DataFileReader` / raw path.
2. Or teach `KeyValueFileReader` to apply DV (pass deletion files through
`KeyValueReadConfig`).
Option 1 is the smaller change if DV + Deduplicate is not yet a design goal.
Either way, please add a DV + Deduplicate + level-0 read test.
##########
crates/paimon/src/table/table_write.rs:
##########
@@ -125,26 +127,33 @@ impl TableWrite {
}
let total_buckets = core_options.bucket();
- let has_primary_keys = !schema.primary_keys().is_empty();
-
- if has_primary_keys {
- if total_buckets < 1 && total_buckets != POSTPONE_BUCKET {
- return Err(crate::Error::Unsupported {
- message: format!(
- "KeyValueFileWriter does not support
bucket={total_buckets}, only fixed bucket (>= 1) or postpone bucket (= -2) is
supported"
- ),
- });
- }
- if total_buckets != POSTPONE_BUCKET
- && core_options
- .changelog_producer()
- .eq_ignore_ascii_case("input")
- {
- return Err(crate::Error::Unsupported {
- message: "KeyValueFileWriter does not support
changelog-producer=input"
- .to_string(),
- });
- }
+ let has_primary_keys = !schema.trimmed_primary_keys().is_empty();
+ let is_dynamic_bucket = has_primary_keys && total_buckets == -1;
+
+ let is_cross_partition = is_dynamic_bucket
+ && !schema.partition_keys().is_empty()
+ && schema.trimmed_primary_keys().len() ==
schema.primary_keys().len();
Review Comment:
**Must-fix (P1, stale data): cross-partition detection is too narrow.**
`trimmed_primary_keys().len() == primary_keys().len()` is equivalent to "PK
and partition columns do not overlap at all". Java Paimon triggers
cross-partition update whenever `!primaryKeys.containsAll(partitionKeys)`, i.e.
any partition column outside the PK.
Counter-example that this branch misses: `PARTITIONED BY (pt1, pt2) +
PRIMARY KEY (pt1, id)`. Here `trimmed=[id]` (len 1), `primary_keys=[pt1, id]`
(len 2), so `1 != 2` → routed to `DynamicBucketAssigner`. When a record moves
across `pt2` values, the PK lookup in the new partition does not see the old
row, no old-partition DELETE is emitted, and the old partition keeps stale data
forever.
Suggest:
```rust
let pk_set: HashSet<_> = schema.primary_keys().iter().collect();
let partition_not_in_pk = schema.partition_keys().iter().any(|p|
!pk_set.contains(p));
let is_cross_partition = is_dynamic_bucket && partition_not_in_pk;
```
Please add a regression test: `PARTITIONED BY (pt1, pt2) + PK (pt1, id)`,
write same `id` under different `pt2`, assert old partition has a DELETE record
(or assert post-commit row count is 1).
##########
crates/paimon/src/table/table_write.rs:
##########
@@ -125,26 +127,33 @@ impl TableWrite {
}
let total_buckets = core_options.bucket();
- let has_primary_keys = !schema.primary_keys().is_empty();
-
- if has_primary_keys {
- if total_buckets < 1 && total_buckets != POSTPONE_BUCKET {
- return Err(crate::Error::Unsupported {
- message: format!(
- "KeyValueFileWriter does not support
bucket={total_buckets}, only fixed bucket (>= 1) or postpone bucket (= -2) is
supported"
- ),
- });
- }
- if total_buckets != POSTPONE_BUCKET
- && core_options
- .changelog_producer()
- .eq_ignore_ascii_case("input")
- {
- return Err(crate::Error::Unsupported {
- message: "KeyValueFileWriter does not support
changelog-producer=input"
- .to_string(),
- });
- }
+ let has_primary_keys = !schema.trimmed_primary_keys().is_empty();
Review Comment:
**Must-fix (P1): `has_primary_keys` misjudgment for `PK ⊆ partition_keys`.**
Switching the definition to `trimmed_primary_keys()` causes tables where `PK
== partition_keys` (a legal schema in Paimon — `partition_keys ⊆ primary_keys`
is the only constraint, and equality is allowed) to be treated as append-only.
Because `primary_key_indices` at line 192 is also derived from
`trimmed_primary_keys()`, `create_writer` at line 521 sees
`self.primary_key_indices.is_empty()` and picks `create_append_writer`. PK
semantics (dedup, merge) are silently dropped.
Suggest separating the two concepts:
```rust
let has_primary_keys = !schema.primary_keys().is_empty();
// keep trimmed_primary_keys() only for building KV key indices below
```
Please add a regression test for `PK == partition_keys` writes: insert
duplicates within the same partition, assert the table dedupes.
##########
crates/paimon/src/spec/schema.rs:
##########
@@ -98,6 +98,22 @@ impl TableSchema {
&self.primary_keys
}
+ /// Primary keys with partition columns removed.
+ ///
+ /// Within a single partition the partition columns are constant, so they
+ /// are redundant in the KV key. Java Paimon calls these "trimmed primary
keys".
+ pub fn trimmed_primary_keys(&self) -> Vec<String> {
+ if self.partition_keys.is_empty() {
+ return self.primary_keys.clone();
+ }
+ let partition_set: HashSet<&str> =
self.partition_keys.iter().map(String::as_str).collect();
+ self.primary_keys
+ .iter()
+ .filter(|pk| !partition_set.contains(pk.as_str()))
+ .cloned()
+ .collect()
+ }
Review Comment:
**Must-fix (P1, silent data loss).** The real target is
`TableSchema::bucket_keys()` at line 145 of this file (outside this hunk, so
leaving the comment here).
`bucket_keys()` still returns `self.primary_keys.clone()` when no explicit
bucket key is set, but the read path added in this PR falls back to
`trimmed_primary_keys()` in two places:
- `crates/paimon/src/table/read_builder.rs:57-63` (`bucket_predicate`)
- `crates/paimon/src/table/table_scan.rs:511-517` (`bucket_key_fields`)
On a fixed-bucket partitioned PK table (`PARTITIONED BY (pt) + PK (pt, id)`,
default bucket key), writes compute `hash([pt, id]) % N`, but a query `WHERE
pt='a' AND id=5` computes the target bucket via `hash([id]) % N`. The target
set at `table_scan.rs:181-190` then prunes the real bucket and the query
silently returns zero rows.
Java Paimon's `TableSchema.bucketKeys()` uses `trimmedPrimaryKeys()` in this
fallback. Suggest changing line 145 to:
```rust
if !self.primary_keys.is_empty() {
return self.trimmed_primary_keys();
}
```
(Note: may return empty when `PK == partition_keys`; `FixedBucketAssigner`
construction in `table_write.rs` already falls through to the constant-bucket-0
branch when `bucket_key_indices.is_empty()`, so that case stays covered.)
Please add a regression test: partitioned PK fixed-bucket table, insert
data, query with both partition and PK columns in the predicate, assert row
count > 0.
##########
crates/paimon/src/spec/index_manifest.rs:
##########
@@ -25,10 +25,45 @@ use std::fmt::{Display, Formatter};
use crate::Result;
+/// Avro schema for IndexManifestEntry OCF serialization.
+///
+/// Must match the serde layout of `IndexManifestEntry`.
+pub const INDEX_MANIFEST_ENTRY_SCHEMA: &str = r#"{
+ "type": "record",
+ "name": "org.apache.paimon.avro.generated.record",
+ "fields": [
+ {"name": "_VERSION", "type": "int"},
+ {"name": "_KIND", "type": "int"},
+ {"name": "_PARTITION", "type": "bytes"},
+ {"name": "_BUCKET", "type": "int"},
+ {"name": "_INDEX_TYPE", "type": "string"},
+ {"name": "_FILE_NAME", "type": "string"},
+ {"name": "_FILE_SIZE", "type": "long"},
+ {"name": "_ROW_COUNT", "type": "long"},
Review Comment:
**Must-fix (P1): Avro schema is missing `_GLOBAL_INDEX`.**
`IndexFileMeta` has a `global_index_meta: Option<GlobalIndexMeta>` field
(with `skip_serializing_if = Option::is_none`), but this schema declares no
`_GLOBAL_INDEX` field.
For HASH-only entries this is fine (the field is always `None`). The problem
is the new merge path in `table_commit.rs::write_index_manifest`: on
APPEND/OVERWRITE it reads the previous index manifest, retains entries, and
rewrites the merged list through `IndexManifest::write` with this schema. Any
pre-existing entry carrying a non-null `global_index_meta` (e.g. DV, global
index) either fails to re-serialize (depending on writer strictness) or
silently drops the field on round-trip, corrupting the manifest.
Suggest adding the field as optional, mirroring Java's
`IndexFileMeta.schema()`:
```json
{
"name": "_GLOBAL_INDEX",
"type": ["null", { ...GlobalIndexMeta record... }],
"default": null
}
```
Nit: the `_FILE_SIZE` / `_ROW_COUNT` declared as `long` here while the Rust
struct fields are `i32` does work — the `test_single_object_serde` test at line
169 already exercises this round-trip — but it's worth a short comment
explaining that `serde_avro_fast` coerces across widths, otherwise it reads as
a bug.
##########
crates/paimon/src/table/table_write.rs:
##########
@@ -291,64 +334,118 @@ impl TableWrite {
}
/// Group rows by (partition_bytes, bucket) and return sub-batches.
- fn divide_by_partition_bucket(
- &self,
+ ///
+ /// In cross-partition mode, also generates DELETE sub-batches for keys
that
+ /// migrated from one partition to another.
+ async fn divide_by_partition_bucket(
+ &mut self,
batch: &RecordBatch,
) -> Result<Vec<(PartitionBucketKey, RecordBatch)>> {
- // Fast path: no partitions and single bucket — skip per-row routing
- if self.partition_field_indices.is_empty() && self.total_buckets <= 1 {
- let bucket = if self.total_buckets == POSTPONE_BUCKET {
- POSTPONE_BUCKET
- } else {
- 0
- };
- return Ok(vec![(
- (EMPTY_SERIALIZED_ROW.clone(), bucket),
- batch.clone(),
- )]);
+ // Fast path: constant bucket with no partitions — skip per-row routing
+ if let BucketAssignerEnum::Constant(ref a) = self.bucket_assigner {
+ if self.partition_keys.is_empty() {
+ return Ok(vec![(
+ (EMPTY_SERIALIZED_ROW.clone(), a.bucket()),
+ batch.clone(),
+ )]);
+ }
}
- let fields = self.table.schema().fields();
- let mut groups: HashMap<PartitionBucketKey, Vec<usize>> =
HashMap::new();
+ let fields = self.table.schema().fields().to_vec();
+ let output = self.bucket_assigner.assign_batch(batch, &fields).await?;
+ let mut groups: HashMap<PartitionBucketKey, Vec<usize>> =
HashMap::new();
+ let skip_set: HashSet<usize> = output.skips.into_iter().collect();
for row_idx in 0..batch.num_rows() {
- let (partition_bytes, bucket) =
- self.extract_partition_bucket(batch, row_idx, fields)?;
+ if skip_set.contains(&row_idx) {
+ continue;
+ }
groups
- .entry((partition_bytes, bucket))
+ .entry((
+ output.partition_bytes[row_idx].clone(),
+ output.buckets[row_idx],
+ ))
.or_default()
.push(row_idx);
}
let mut result = Vec::with_capacity(groups.len());
+ let has_deletes = !output.deletes.is_empty();
for (key, row_indices) in groups {
- let sub_batch = if row_indices.len() == batch.num_rows() {
- batch.clone()
+ let sub_batch = Self::take_rows(batch, &row_indices)?;
+ let sub_batch = if has_deletes {
+ Self::add_value_kind_column(&sub_batch, 0)?
Review Comment:
**Must-fix (P1): `_VALUE_KIND` column is added only when `has_deletes=true`,
causing intra-writer schema drift.**
`KeyValueFileWriter` buffers batches and later calls `concat_batches`, which
requires identical Arrow schemas across all accumulated batches. This code path
only appends `_VALUE_KIND` when the current invocation produced cross-partition
deletes. If the same writer instance first receives a batch with no deletes
(schema without `_VALUE_KIND`), then later a batch containing deletes (schema
with `_VALUE_KIND`), the later concat fails with a schema mismatch error.
Suggest one of:
- Always add `_VALUE_KIND=0` to inserts in cross-partition-capable writers,
keeping the writer schema stable across batches.
- Or have `KeyValueFileWriter` own the `_VALUE_KIND` column (inject it on
ingestion), so `TableWrite` does not need to special-case this.
Please add a regression test: single `TableWrite` on a cross-partition
table, first `write_arrow_batch` with no migrated keys, then second
`write_arrow_batch` that migrates keys, both followed by a successful
`prepare_commit`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]