luoyuxia commented on code in PR #207:
URL: https://github.com/apache/paimon-rust/pull/207#discussion_r3039755320


##########
crates/paimon/src/table/table_scan.rs:
##########
@@ -667,14 +694,17 @@ impl<'a> TableScan<'a> {
                 if let Some(files) = data_deletion_files {
                     builder = builder.with_data_deletion_files(files);
                 }
+                if let Some(row_ranges) = split_row_ranges {
+                    builder = builder.with_row_ranges(row_ranges);
+                }
                 splits.push(builder.build()?);
             }
         }
 
         // Apply limit pushdown only when there are no data predicates.
         // With data predicates, merged_row_count() reflects pre-filter row 
counts,
         // so stopping early could return fewer rows than the limit after 
filtering.
-        let splits = if self.data_predicates.is_empty() {
+        let splits = if self.data_predicates.is_empty() && 
self.row_ranges.is_none() {

Review Comment:
   nit: may also update comment to reflect newly row_ranges



##########
crates/paimon/src/table/source.rs:
##########
@@ -22,6 +22,323 @@
 use crate::spec::{BinaryRow, DataFileMeta};
 use crate::table::stats_filter::group_by_overlapping_row_id;
 use serde::{Deserialize, Serialize};
+// ======================= RowRange ===============================
+
+/// An inclusive row ID range `[from, to]` for filtering reads in data 
evolution mode.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct RowRange {
+    from: i64,
+    to: i64,
+}
+
+impl RowRange {
+    pub fn new(from: i64, to: i64) -> Self {
+        debug_assert!(from <= to, "RowRange from ({from}) must be <= to 
({to})");

Review Comment:
   debug assert! won't prevent illegal  RowRange in release. 
   Either `assert!` or return `Result<Self>`



##########
crates/paimon/src/table/source.rs:
##########
@@ -22,6 +22,323 @@
 use crate::spec::{BinaryRow, DataFileMeta};
 use crate::table::stats_filter::group_by_overlapping_row_id;
 use serde::{Deserialize, Serialize};
+// ======================= RowRange ===============================
+
+/// An inclusive row ID range `[from, to]` for filtering reads in data 
evolution mode.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct RowRange {
+    from: i64,
+    to: i64,
+}
+
+impl RowRange {
+    pub fn new(from: i64, to: i64) -> Self {
+        debug_assert!(from <= to, "RowRange from ({from}) must be <= to 
({to})");
+        Self { from, to }
+    }
+
+    pub fn from(&self) -> i64 {
+        self.from
+    }
+
+    pub fn to(&self) -> i64 {
+        self.to
+    }
+
+    pub fn count(&self) -> i64 {
+        self.to - self.from + 1
+    }
+
+    /// Check overlap with an inclusive file range `[file_start, file_end]`.
+    pub fn overlaps_inclusive(&self, file_start: i64, file_end_inclusive: i64) 
-> bool {
+        self.from <= file_end_inclusive && self.to >= file_start
+    }
+
+    /// Intersect with an inclusive file range `[file_start, file_end]`.
+    pub fn intersect_inclusive(
+        &self,
+        file_start: i64,
+        file_end_inclusive: i64,
+    ) -> Option<RowRange> {
+        let from = self.from.max(file_start);
+        let to = self.to.min(file_end_inclusive);
+        if from <= to {
+            Some(RowRange::new(from, to))
+        } else {
+            None
+        }
+    }
+}
+
+/// Returns `true` if the file has no `first_row_id`.
+pub fn any_range_overlaps_file(ranges: &[RowRange], file: &DataFileMeta) -> 
bool {
+    match file.row_id_range() {
+        None => true,
+        Some((file_start, file_end)) => ranges
+            .iter()
+            .any(|r| r.overlaps_inclusive(file_start, file_end)),
+    }
+}
+
+pub fn intersect_ranges_with_file(ranges: &[RowRange], file: &DataFileMeta) -> 
Vec<RowRange> {
+    match file.row_id_range() {
+        None => Vec::new(),
+        Some((file_start, file_end)) => ranges
+            .iter()
+            .filter_map(|r| r.intersect_inclusive(file_start, file_end))
+            .collect(),
+    }
+}
+
+pub fn merge_row_ranges(mut ranges: Vec<RowRange>) -> Vec<RowRange> {
+    if ranges.len() <= 1 {
+        return ranges;
+    }
+    ranges.sort_by_key(|r| r.from);
+    let mut merged: Vec<RowRange> = Vec::with_capacity(ranges.len());
+    let mut current = ranges[0].clone();
+    for r in &ranges[1..] {
+        if r.from <= current.to.saturating_add(1) {
+            current.to = current.to.max(r.to);
+        } else {
+            merged.push(current);
+            current = r.clone();

Review Comment:
   nit: we don't need to do the clone, just take over the owership
   ```
   if ranges.len() <= 1 {
           return ranges;
       }
   
       ranges.sort_by_key(|r| r.from);
   
       let mut merged = Vec::with_capacity(ranges.len());
       let mut iter = ranges.into_iter();
       let mut current = iter.next().unwrap();
   
       for r in iter {
           if r.from <= current.to.saturating_add(1) {
               current.to = current.to.max(r.to);
           } else {
               merged.push(current);
               current = r;
           }
       }
   
       merged.push(current);
       merged
   ```



##########
crates/paimon/src/arrow/reader.rs:
##########
@@ -596,8 +710,12 @@ fn merge_files_by_columns(
         // column that no file contains yet), we still need to emit 
NULL-filled rows to
         // preserve the correct row count.
         if active_file_indices.is_empty() {
-            // All files in a merge group cover the same rows; use the first 
file's row_count.
-            let total_rows = data_files[0].row_count as usize;
+            let first_row_id = data_files[0].first_row_id.unwrap_or(0);
+            let file_row_count = data_files[0].row_count;
+            let total_rows = match &row_ranges {
+                Some(ranges) => expand_selected_row_ids(first_row_id, 
file_row_count, ranges).len(),

Review Comment:
   nit: 
   in plan phase the ranges in the data split are already merge_row_ranges, but 
the read phase here, it will still do another merge_row_ranges? Is it 
duplicated?



##########
crates/paimon/src/arrow/reader.rs:
##########
@@ -1355,6 +1474,112 @@ fn exact_parquet_value<'a, T>(
     }
 }
 
+/// Expand row_ranges into a flat sequence of selected row IDs for a file.
+fn expand_selected_row_ids(first_row_id: i64, row_count: i64, row_ranges: 
&[RowRange]) -> Vec<i64> {
+    let file_end = first_row_id + row_count - 1;

Review Comment:
   what if row_count is 0, is expected file_end will be less than first_row_id?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to