TheR1sing3un commented on code in PR #374: URL: https://github.com/apache/paimon-rust/pull/374#discussion_r3397725867
########## crates/paimon/src/table/merge_tree_split_generator.rs: ########## @@ -0,0 +1,551 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Split generation for primary-key (merge-tree) tables. +//! +//! Files whose primary-key ranges overlap must be read by the same +//! sort-merge reader, so they have to stay in the same split. This module +//! groups a bucket's files into key-range "sections" and then bin-packs +//! whole sections into splits, mirroring the Java implementation. +//! +//! References: +//! [MergeTreeSplitGenerator](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java), +//! [IntervalPartition](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java) + +use super::bin_pack::pack_for_ordered; +use crate::spec::{datum_cmp, BinaryRow, DataFileMeta, DataType, Datum, TableSchema}; +use std::cmp::{self, Ordering}; + +/// Compares serialized `BinaryRow` keys field-by-field using the trimmed +/// primary-key data types. +/// +/// BinaryRow stores fields little-endian, so raw byte comparison would order +/// e.g. int 256 (`[00 01 00 00]`) before int 1 (`[01 00 00 00]`); keys must +/// be decoded before comparing. +pub(crate) struct KeyComparator { + key_types: Vec<DataType>, +} + +/// A decoded key: one `Option<Datum>` per trimmed primary-key field +/// (`None` = SQL NULL). +type DecodedKey = Vec<Option<Datum>>; + +impl KeyComparator { + pub(crate) fn new(key_types: Vec<DataType>) -> Self { + Self { key_types } + } + + /// Build a comparator over a table's trimmed primary keys, matching the + /// key layout the kv writer uses for min/max keys. Returns `None` for + /// tables without primary keys. + pub(crate) fn from_table_schema(schema: &TableSchema) -> Option<Self> { + let trimmed_pks = schema.trimmed_primary_keys(); + if trimmed_pks.is_empty() { + return None; + } + let fields = schema.fields(); + let key_types: Vec<DataType> = trimmed_pks + .iter() + .filter_map(|name| { + fields + .iter() + .find(|f| f.name() == name) + .map(|f| f.data_type().clone()) + }) + .collect(); + // A PK name missing from the fields (should not happen) leaves the + // arity short; decode then fails and callers degrade safely. + Some(Self::new(key_types)) + } + + /// Decode a serialized min/max key. Returns `None` when the key is empty + /// or malformed, letting callers degrade to the safe "treat everything as + /// overlapping" path instead of failing the scan. + fn decode(&self, key: &[u8]) -> Option<DecodedKey> { + if key.is_empty() { + return None; + } + let row = BinaryRow::from_serialized_bytes(key).ok()?; + if (row.arity() as usize) != self.key_types.len() { + return None; + } + self.key_types + .iter() + .enumerate() + .map(|(pos, dt)| row.get_datum(pos, dt).ok()) + .collect() + } +} + +/// Compare decoded keys field-by-field. NULL sorts first; fields that +/// `datum_cmp` cannot order (e.g. float NaN) compare as equal, which forces +/// the files into the same section — conservative but never incorrect. +fn compare_decoded(a: &DecodedKey, b: &DecodedKey) -> Ordering { + for (fa, fb) in a.iter().zip(b.iter()) { + let ord = match (fa, fb) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(da), Some(db)) => datum_cmp(da, db).unwrap_or(Ordering::Equal), + }; + if ord != Ordering::Equal { + return ord; + } + } + Ordering::Equal +} + +/// A file paired with its decoded min/max keys. +struct KeyedFile { + file: DataFileMeta, + min: DecodedKey, + max: DecodedKey, +} + +/// Decode every file's key range up front. Returns `None` if any file lacks +/// a usable key range, in which case callers must assume full overlap. +fn decode_all( + files: Vec<DataFileMeta>, + comparator: &KeyComparator, +) -> Result<Vec<KeyedFile>, Vec<DataFileMeta>> { + let mut keyed = Vec::with_capacity(files.len()); + let mut undecodable = false; + for file in &files { + match ( + comparator.decode(&file.min_key), + comparator.decode(&file.max_key), + ) { + (Some(min), Some(max)) if !undecodable => keyed.push(KeyedFile { + file: file.clone(), + min, + max, + }), + _ => undecodable = true, + } + } + if undecodable { + Err(files) + } else { + Ok(keyed) + } +} + +/// Group files into sections by primary-key range overlap. +/// +/// Files are sorted by `(min_key, max_key)`; a running upper bound tracks the +/// max key seen in the current section, and a file whose min key exceeds the +/// bound starts a new section. Sections never overlap each other, while files +/// inside one section all transitively overlap and must be merged together. +/// +/// Files with empty or undecodable key ranges collapse everything into one +/// section: no parallelism, but never a missed merge. +pub(crate) fn interval_partition( + files: Vec<DataFileMeta>, + comparator: &KeyComparator, +) -> Vec<Vec<DataFileMeta>> { + if files.len() <= 1 { + return if files.is_empty() { + Vec::new() + } else { + vec![files] + }; + } + + let mut keyed = match decode_all(files, comparator) { + Ok(keyed) => keyed, + Err(files) => return vec![files], + }; + keyed.sort_by(|a, b| { + compare_decoded(&a.min, &b.min).then_with(|| compare_decoded(&a.max, &b.max)) + }); + + let mut sections: Vec<Vec<DataFileMeta>> = Vec::new(); + let mut current: Vec<DataFileMeta> = Vec::new(); + let mut bound: Option<DecodedKey> = None; + + for kf in keyed { + if let Some(ref b) = bound { + if compare_decoded(&kf.min, b) == Ordering::Greater { + sections.push(std::mem::take(&mut current)); + bound = None; + } + } + match bound { + Some(ref b) if compare_decoded(&kf.max, b) != Ordering::Greater => {} + _ => bound = Some(kf.max), + } + current.push(kf.file); + } + if !current.is_empty() { + sections.push(current); + } + sections +} + +/// Bin-pack whole sections into splits. A section is atomic: its files +/// overlap on primary key and must never be separated, even when the section +/// alone exceeds `target_split_size`. +/// +/// Mirrors Java `MergeTreeSplitGenerator#packSplits`: a section's weight is +/// `max(total file size, open_file_cost)` — the open-file cost is charged +/// once per section, not per file. +pub(crate) fn pack_sections( + sections: Vec<Vec<DataFileMeta>>, + target_split_size: i64, + open_file_cost: i64, +) -> Vec<Vec<DataFileMeta>> { + pack_for_ordered( + sections, + |section| { + cmp::max( + section.iter().map(|f| f.file_size).sum::<i64>(), + open_file_cost, + ) + }, + target_split_size, + ) + .into_iter() + .map(|sections| sections.into_iter().flatten().collect()) + .collect() +} + +/// A group of files forming one split, plus whether the split can be read +/// raw — without the sort-merge reader — so its physical row count equals +/// its logical row count. +/// +/// Mirrors Java `SplitGenerator.SplitGroup`. +#[derive(Debug)] +pub(crate) struct SplitGroup { + pub(crate) files: Vec<DataFileMeta>, + pub(crate) raw_convertible: bool, +} + +/// Whether a file is known to contain no DELETE rows. +/// +/// Mirrors Java `MergeTreeSplitGenerator#withoutDeleteRow`: a missing +/// `delete_row_count` is treated as "no deletes" for compatibility with files +/// written by old versions. +fn without_delete_row(file: &DataFileMeta) -> bool { + file.delete_row_count.is_none_or(|count| count == 0) +} + +/// Generate batch splits for a merge-tree (primary-key) bucket. +/// +/// Mirrors Java `MergeTreeSplitGenerator#splitForBatch` for the merging read +/// path (deletion-vector and first-row tables are routed to plain size-based +/// packing before reaching this function, matching Java's +/// `alwaysRawConvertible` fast path): +/// +/// * If every file is compacted (level != 0), has no delete rows, and all +/// files sit on a single level, no two files can overlap on key range, so +/// the files are bin-packed individually and every group is raw +/// convertible. +/// * Otherwise files are sectioned by key-range overlap and whole sections +/// are bin-packed; a group is raw convertible only when it holds exactly +/// one file without delete rows. +pub(crate) fn merge_tree_split_for_batch( + files: Vec<DataFileMeta>, + comparator: &KeyComparator, + target_split_size: i64, + open_file_cost: i64, +) -> Vec<SplitGroup> { + let raw_convertible = files.iter().all(|f| f.level != 0 && without_delete_row(f)); + let one_level = { + let mut levels: Vec<i32> = files.iter().map(|f| f.level).collect(); + levels.sort_unstable(); + levels.dedup(); + levels.len() == 1 + }; + + if raw_convertible && one_level { + return pack_for_ordered( + files, + |f| cmp::max(f.file_size, open_file_cost), + target_split_size, + ) + .into_iter() + .map(|files| SplitGroup { + files, + raw_convertible: true, + }) + .collect(); + } + + pack_sections( + interval_partition(files, comparator), + target_split_size, + open_file_cost, + ) + .into_iter() + .map(|files| { + let raw_convertible = files.len() == 1 && without_delete_row(&files[0]); Review Comment: Confirmed — thanks for the careful analysis. The root cause is a writer-side gap rather than the split generator itself: Java's `MergeTreeWriter#flushWriteBuffer` runs the engine's merge function over the write buffer before flushing, so every data file has unique keys regardless of merge engine — that invariant is what lets `MergeTreeSplitGenerator` mark single-file groups raw convertible. The Rust writer currently only deduplicates at flush for deduplicate/first-row (`select_flush_indices`), while partial-update intentionally keeps all rows for read-side field-wise merge, so the uniqueness premise doesn't hold for it. Fixed in this PR as you suggested: split groups are now raw convertible only when the merge engine guarantees file-internal key uniqueness (currently deduplicate only), so partial-update splits always report an unknown merged row count — consistent with the read path, which always routes them through the merge reader. Added COUNT(*) and LIMIT regression tests using your minimal case. As a follow-up, #380 ports Java's write-time merge (`MergeTreeWriter#flushWriteBuffer`) for partial-update, restoring the file-internal uniqueness invariant; once that lands the conservative gating here can be relaxed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
