QuakeWang commented on code in PR #374:
URL: https://github.com/apache/paimon-rust/pull/374#discussion_r3396440436


##########
crates/paimon/src/table/merge_tree_split_generator.rs:
##########
@@ -0,0 +1,551 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Split generation for primary-key (merge-tree) tables.
+//!
+//! Files whose primary-key ranges overlap must be read by the same
+//! sort-merge reader, so they have to stay in the same split. This module
+//! groups a bucket's files into key-range "sections" and then bin-packs
+//! whole sections into splits, mirroring the Java implementation.
+//!
+//! References:
+//! 
[MergeTreeSplitGenerator](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java),
+//! 
[IntervalPartition](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java)
+
+use super::bin_pack::pack_for_ordered;
+use crate::spec::{datum_cmp, BinaryRow, DataFileMeta, DataType, Datum, 
TableSchema};
+use std::cmp::{self, Ordering};
+
+/// Compares serialized `BinaryRow` keys field-by-field using the trimmed
+/// primary-key data types.
+///
+/// BinaryRow stores fields little-endian, so raw byte comparison would order
+/// e.g. int 256 (`[00 01 00 00]`) before int 1 (`[01 00 00 00]`); keys must
+/// be decoded before comparing.
+pub(crate) struct KeyComparator {
+    key_types: Vec<DataType>,
+}
+
+/// A decoded key: one `Option<Datum>` per trimmed primary-key field
+/// (`None` = SQL NULL).
+type DecodedKey = Vec<Option<Datum>>;
+
+impl KeyComparator {
+    pub(crate) fn new(key_types: Vec<DataType>) -> Self {
+        Self { key_types }
+    }
+
+    /// Build a comparator over a table's trimmed primary keys, matching the
+    /// key layout the kv writer uses for min/max keys. Returns `None` for
+    /// tables without primary keys.
+    pub(crate) fn from_table_schema(schema: &TableSchema) -> Option<Self> {
+        let trimmed_pks = schema.trimmed_primary_keys();
+        if trimmed_pks.is_empty() {
+            return None;
+        }
+        let fields = schema.fields();
+        let key_types: Vec<DataType> = trimmed_pks
+            .iter()
+            .filter_map(|name| {
+                fields
+                    .iter()
+                    .find(|f| f.name() == name)
+                    .map(|f| f.data_type().clone())
+            })
+            .collect();
+        // A PK name missing from the fields (should not happen) leaves the
+        // arity short; decode then fails and callers degrade safely.
+        Some(Self::new(key_types))
+    }
+
+    /// Decode a serialized min/max key. Returns `None` when the key is empty
+    /// or malformed, letting callers degrade to the safe "treat everything as
+    /// overlapping" path instead of failing the scan.
+    fn decode(&self, key: &[u8]) -> Option<DecodedKey> {
+        if key.is_empty() {
+            return None;
+        }
+        let row = BinaryRow::from_serialized_bytes(key).ok()?;
+        if (row.arity() as usize) != self.key_types.len() {
+            return None;
+        }
+        self.key_types
+            .iter()
+            .enumerate()
+            .map(|(pos, dt)| row.get_datum(pos, dt).ok())
+            .collect()
+    }
+}
+
+/// Compare decoded keys field-by-field. NULL sorts first; fields that
+/// `datum_cmp` cannot order (e.g. float NaN) compare as equal, which forces
+/// the files into the same section — conservative but never incorrect.
+fn compare_decoded(a: &DecodedKey, b: &DecodedKey) -> Ordering {
+    for (fa, fb) in a.iter().zip(b.iter()) {
+        let ord = match (fa, fb) {
+            (None, None) => Ordering::Equal,
+            (None, Some(_)) => Ordering::Less,
+            (Some(_), None) => Ordering::Greater,
+            (Some(da), Some(db)) => datum_cmp(da, 
db).unwrap_or(Ordering::Equal),
+        };
+        if ord != Ordering::Equal {
+            return ord;
+        }
+    }
+    Ordering::Equal
+}
+
+/// A file paired with its decoded min/max keys.
+struct KeyedFile {
+    file: DataFileMeta,
+    min: DecodedKey,
+    max: DecodedKey,
+}
+
+/// Decode every file's key range up front. Returns `None` if any file lacks
+/// a usable key range, in which case callers must assume full overlap.
+fn decode_all(
+    files: Vec<DataFileMeta>,
+    comparator: &KeyComparator,
+) -> Result<Vec<KeyedFile>, Vec<DataFileMeta>> {
+    let mut keyed = Vec::with_capacity(files.len());
+    let mut undecodable = false;
+    for file in &files {
+        match (
+            comparator.decode(&file.min_key),
+            comparator.decode(&file.max_key),
+        ) {
+            (Some(min), Some(max)) if !undecodable => keyed.push(KeyedFile {
+                file: file.clone(),
+                min,
+                max,
+            }),
+            _ => undecodable = true,
+        }
+    }
+    if undecodable {
+        Err(files)
+    } else {
+        Ok(keyed)
+    }
+}
+
+/// Group files into sections by primary-key range overlap.
+///
+/// Files are sorted by `(min_key, max_key)`; a running upper bound tracks the
+/// max key seen in the current section, and a file whose min key exceeds the
+/// bound starts a new section. Sections never overlap each other, while files
+/// inside one section all transitively overlap and must be merged together.
+///
+/// Files with empty or undecodable key ranges collapse everything into one
+/// section: no parallelism, but never a missed merge.
+pub(crate) fn interval_partition(
+    files: Vec<DataFileMeta>,
+    comparator: &KeyComparator,
+) -> Vec<Vec<DataFileMeta>> {
+    if files.len() <= 1 {
+        return if files.is_empty() {
+            Vec::new()
+        } else {
+            vec![files]
+        };
+    }
+
+    let mut keyed = match decode_all(files, comparator) {
+        Ok(keyed) => keyed,
+        Err(files) => return vec![files],
+    };
+    keyed.sort_by(|a, b| {
+        compare_decoded(&a.min, &b.min).then_with(|| compare_decoded(&a.max, 
&b.max))
+    });
+
+    let mut sections: Vec<Vec<DataFileMeta>> = Vec::new();
+    let mut current: Vec<DataFileMeta> = Vec::new();
+    let mut bound: Option<DecodedKey> = None;
+
+    for kf in keyed {
+        if let Some(ref b) = bound {
+            if compare_decoded(&kf.min, b) == Ordering::Greater {
+                sections.push(std::mem::take(&mut current));
+                bound = None;
+            }
+        }
+        match bound {
+            Some(ref b) if compare_decoded(&kf.max, b) != Ordering::Greater => 
{}
+            _ => bound = Some(kf.max),
+        }
+        current.push(kf.file);
+    }
+    if !current.is_empty() {
+        sections.push(current);
+    }
+    sections
+}
+
+/// Bin-pack whole sections into splits. A section is atomic: its files
+/// overlap on primary key and must never be separated, even when the section
+/// alone exceeds `target_split_size`.
+///
+/// Mirrors Java `MergeTreeSplitGenerator#packSplits`: a section's weight is
+/// `max(total file size, open_file_cost)` — the open-file cost is charged
+/// once per section, not per file.
+pub(crate) fn pack_sections(
+    sections: Vec<Vec<DataFileMeta>>,
+    target_split_size: i64,
+    open_file_cost: i64,
+) -> Vec<Vec<DataFileMeta>> {
+    pack_for_ordered(
+        sections,
+        |section| {
+            cmp::max(
+                section.iter().map(|f| f.file_size).sum::<i64>(),
+                open_file_cost,
+            )
+        },
+        target_split_size,
+    )
+    .into_iter()
+    .map(|sections| sections.into_iter().flatten().collect())
+    .collect()
+}
+
+/// A group of files forming one split, plus whether the split can be read
+/// raw — without the sort-merge reader — so its physical row count equals
+/// its logical row count.
+///
+/// Mirrors Java `SplitGenerator.SplitGroup`.
+#[derive(Debug)]
+pub(crate) struct SplitGroup {
+    pub(crate) files: Vec<DataFileMeta>,
+    pub(crate) raw_convertible: bool,
+}
+
+/// Whether a file is known to contain no DELETE rows.
+///
+/// Mirrors Java `MergeTreeSplitGenerator#withoutDeleteRow`: a missing
+/// `delete_row_count` is treated as "no deletes" for compatibility with files
+/// written by old versions.
+fn without_delete_row(file: &DataFileMeta) -> bool {
+    file.delete_row_count.is_none_or(|count| count == 0)
+}
+
+/// Generate batch splits for a merge-tree (primary-key) bucket.
+///
+/// Mirrors Java `MergeTreeSplitGenerator#splitForBatch` for the merging read
+/// path (deletion-vector and first-row tables are routed to plain size-based
+/// packing before reaching this function, matching Java's
+/// `alwaysRawConvertible` fast path):
+///
+/// * If every file is compacted (level != 0), has no delete rows, and all
+///   files sit on a single level, no two files can overlap on key range, so
+///   the files are bin-packed individually and every group is raw
+///   convertible.
+/// * Otherwise files are sectioned by key-range overlap and whole sections
+///   are bin-packed; a group is raw convertible only when it holds exactly
+///   one file without delete rows.
+pub(crate) fn merge_tree_split_for_batch(
+    files: Vec<DataFileMeta>,
+    comparator: &KeyComparator,
+    target_split_size: i64,
+    open_file_cost: i64,
+) -> Vec<SplitGroup> {
+    let raw_convertible = files.iter().all(|f| f.level != 0 && 
without_delete_row(f));
+    let one_level = {
+        let mut levels: Vec<i32> = files.iter().map(|f| f.level).collect();
+        levels.sort_unstable();
+        levels.dedup();
+        levels.len() == 1
+    };
+
+    if raw_convertible && one_level {
+        return pack_for_ordered(
+            files,
+            |f| cmp::max(f.file_size, open_file_cost),
+            target_split_size,
+        )
+        .into_iter()
+        .map(|files| SplitGroup {
+            files,
+            raw_convertible: true,
+        })
+        .collect();
+    }
+
+    pack_sections(
+        interval_partition(files, comparator),
+        target_split_size,
+        open_file_cost,
+    )
+    .into_iter()
+    .map(|files| {
+        let raw_convertible = files.len() == 1 && 
without_delete_row(&files[0]);

Review Comment:
   This is not safe for partial-update tables. The Rust partial-update writer 
can keep multiple physical rows for the same primary key inside a single data 
file, and `TableRead` always routes partial-update reads through the KV merge 
reader for that reason.
   
   If this single-file group is marked `raw_convertible=true`, 
`DataSplit::merged_row_count()` reports the physical row count as exact. That 
can make `COUNT(*)` return too many rows and can let LIMIT pushdown prune later 
splits too early.
   
   A minimal case is one INSERT writing three partial updates for the same key: 
normal SELECT merges to one row, but COUNT(*) sees 3. Please keep 
partial-update splits non-raw-convertible unless file-internal key uniqueness 
is proven, and add COUNT/LIMIT regression coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to