QuakeWang commented on code in PR #193:
URL: https://github.com/apache/paimon-rust/pull/193#discussion_r3026790904


##########
crates/paimon/src/arrow/reader.rs:
##########
@@ -184,6 +188,259 @@ impl ArrowReader {
         }
             .boxed())
     }
+
+    /// Read data files in data evolution mode, merging columns from files 
that share the same row ID range.
+    ///
+    /// Each DataSplit contains files grouped by `first_row_id`. Files within 
a split may contain
+    /// different columns for the same logical rows. This method reads each 
file and merges them
+    /// column-wise, respecting `max_sequence_number` for conflict resolution.
+    ///
+    /// `table_fields` is the full table schema fields, used to determine 
which columns each file
+    /// provides when `write_cols` is not set.
+    pub fn read_data_evolution(
+        self,
+        data_splits: &[DataSplit],
+        table_fields: &[DataField],
+    ) -> crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size;
+        let splits: Vec<DataSplit> = data_splits.to_vec();
+        let read_type = self.read_type;
+        let table_field_names: Vec<String> =
+            table_fields.iter().map(|f| f.name().to_string()).collect();
+        let projected_column_names: Vec<String> = read_type
+            .iter()
+            .map(|field| field.name().to_string())
+            .collect();
+
+        Ok(try_stream! {
+            for split in &splits {
+                if split.raw_convertible() || split.data_files().len() == 1 {
+                    // Single file or raw convertible — read normally.
+                    for file_meta in split.data_files() {
+                        let batches = read_single_file(
+                            &file_io, split, file_meta, 
&projected_column_names, batch_size, None,
+                        ).await?;
+                        for batch in batches {
+                            yield batch;
+                        }
+                    }
+                } else {
+                    // Multiple files need column-wise merge.
+                    let merged_batches = merge_files_by_columns(
+                        &file_io,
+                        split,
+                        &projected_column_names,
+                        &table_field_names,
+                        batch_size,
+                    ).await?;
+                    for batch in merged_batches {
+                        yield batch;
+                    }
+                }
+            }
+        }
+        .boxed())
+    }
+}
+
+/// Read a single parquet file from a split, returning all batches.
+/// Optionally applies a deletion vector.
+async fn read_single_file(
+    file_io: &FileIO,
+    split: &DataSplit,
+    file_meta: &DataFileMeta,
+    projected_column_names: &[String],
+    batch_size: Option<usize>,
+    dv: Option<&DeletionVector>,
+) -> crate::Result<Vec<RecordBatch>> {
+    let path_to_read = split.data_file_path(file_meta);
+    if !path_to_read.to_ascii_lowercase().ends_with(".parquet") {
+        return Err(Error::Unsupported {
+            message: format!(
+                "unsupported file format: only .parquet is supported, got: 
{path_to_read}"
+            ),
+        });
+    }
+
+    let parquet_file = file_io.new_input(&path_to_read)?;
+    let (parquet_metadata, parquet_reader) =
+        try_join!(parquet_file.metadata(), parquet_file.reader())?;
+    let arrow_file_reader = ArrowFileReader::new(parquet_metadata, 
parquet_reader);
+
+    let mut batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
+
+    // Only project columns that exist in this file.
+    let parquet_schema = batch_stream_builder.parquet_schema().clone();
+    let file_column_names: Vec<&str> = parquet_schema.columns().iter().map(|c| 
c.name()).collect();
+    let available_columns: Vec<&str> = projected_column_names
+        .iter()
+        .filter(|name| file_column_names.contains(&name.as_str()))
+        .map(String::as_str)
+        .collect();
+
+    if available_columns.is_empty() {
+        return Ok(Vec::new());
+    }
+
+    let mask = ProjectionMask::columns(&parquet_schema, 
available_columns.iter().copied());
+    batch_stream_builder = batch_stream_builder.with_projection(mask);
+
+    if let Some(dv) = dv {
+        if !dv.is_empty() {
+            let row_selection =
+                
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
+            batch_stream_builder = 
batch_stream_builder.with_row_selection(row_selection);
+        }
+    }
+    if let Some(size) = batch_size {
+        batch_stream_builder = batch_stream_builder.with_batch_size(size);
+    }
+
+    let mut batch_stream = batch_stream_builder.build()?;
+    let mut batches = Vec::new();
+    while let Some(batch) = batch_stream.next().await {
+        batches.push(batch?);
+    }
+    Ok(batches)
+}
+
+/// Merge multiple files column-wise for data evolution.
+///
+/// All files in the split share the same `first_row_id` and `row_count`.
+/// Each file contributes a subset of columns. When multiple files provide the 
same column,
+/// the file with the higher `max_sequence_number` wins.
+async fn merge_files_by_columns(
+    file_io: &FileIO,
+    split: &DataSplit,
+    projected_column_names: &[String],
+    table_field_names: &[String],
+    batch_size: Option<usize>,
+) -> crate::Result<Vec<RecordBatch>> {
+    let data_files = split.data_files();
+    if data_files.is_empty() {
+        return Ok(Vec::new());
+    }
+
+    // Determine which columns each file provides and resolve conflicts by 
max_sequence_number.
+    // column_name -> (file_index, max_sequence_number)
+    let mut column_source: HashMap<String, (usize, i64)> = HashMap::new();
+
+    for (file_idx, file_meta) in data_files.iter().enumerate() {
+        let file_columns: Vec<String> = if let Some(ref wc) = 
file_meta.write_cols {

Review Comment:
   Falling back to “all columns from the current table schema” when `write_cols 
== None` is not correct here.
   
   One of the main data-evolution scenarios is reading old files after the 
table has added new columns. In that case, an old file only contains fields 
from the schema identified by `file_meta.schema_id`; using the current table 
schema here will incorrectly mark later-added columns as if this file already 
provided them. That affects winning-column resolution, and it can also drop 
entire row groups when the projection only contains newly added columns.



##########
crates/paimon/src/table/table_scan.rs:
##########
@@ -321,26 +370,52 @@ impl<'a> TableScan<'a> {
                 .as_ref()
                 .and_then(|map| map.get(&PartitionBucket::new(partition, 
bucket)));
 
-            let file_groups = split_for_batch(data_files, target_split_size, 
open_file_cost);
-            for file_group in file_groups {
-                let data_deletion_files = 
per_bucket_deletion_map.map(|per_bucket| {
-                    file_group
-                        .iter()
-                        .map(|f| per_bucket.get(&f.file_name).cloned())
-                        .collect::<Vec<Option<DeletionFile>>>()
-                });
-
-                let mut builder = DataSplitBuilder::new()
-                    .with_snapshot(snapshot_id)
-                    .with_partition(partition_row.clone())
-                    .with_bucket(bucket)
-                    .with_bucket_path(bucket_path.clone())
-                    .with_total_buckets(total_buckets)
-                    .with_data_files(file_group);
-                if let Some(files) = data_deletion_files {
-                    builder = builder.with_data_deletion_files(files);
+            if data_evolution_enabled {
+                let file_groups = split_by_row_id(data_files);

Review Comment:
   The split-generation logic here diverges from upstream 
`DataEvolutionSplitGenerator`.
   
   Upstream does not simply group by equal `first_row_id` and emit one split 
per group. It first merges overlapping `row_id_range`s, then applies ordered 
bin packing using `target_split_size`/`open_file_cost`, and computes 
`rawConvertible` from the packed result. The current implementation introduces 
two regressions:
     1. `source.split.*` is effectively bypassed in data-evolution mode, so 
splits become much more fragmented.
     2. Grouping only by `first_row_id` misses the overlapping row-id-range 
case, which no longer matches upstream grouping semantics.
   
     I think we should align this with the Java split generator before merging 
the new read path.



##########
crates/paimon/src/arrow/reader.rs:
##########
@@ -184,6 +188,259 @@ impl ArrowReader {
         }
             .boxed())
     }
+
+    /// Read data files in data evolution mode, merging columns from files 
that share the same row ID range.
+    ///
+    /// Each DataSplit contains files grouped by `first_row_id`. Files within 
a split may contain
+    /// different columns for the same logical rows. This method reads each 
file and merges them
+    /// column-wise, respecting `max_sequence_number` for conflict resolution.
+    ///
+    /// `table_fields` is the full table schema fields, used to determine 
which columns each file
+    /// provides when `write_cols` is not set.
+    pub fn read_data_evolution(
+        self,
+        data_splits: &[DataSplit],
+        table_fields: &[DataField],
+    ) -> crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size;
+        let splits: Vec<DataSplit> = data_splits.to_vec();
+        let read_type = self.read_type;
+        let table_field_names: Vec<String> =
+            table_fields.iter().map(|f| f.name().to_string()).collect();
+        let projected_column_names: Vec<String> = read_type
+            .iter()
+            .map(|field| field.name().to_string())
+            .collect();
+
+        Ok(try_stream! {
+            for split in &splits {
+                if split.raw_convertible() || split.data_files().len() == 1 {

Review Comment:
   The raw-convertible / single-file branch yields `read_single_file()` 
directly, but this helper does not reorder columns back to 
`projected_column_names` after `ProjectionMask`, unlike the existing `read()` 
path.
   
   Parquet returns file-schema order, not request order, so a projection like 
`["value", "id"]` will come back as `["id", "value"]` for single-file splits. 
That makes this path inconsistent with the current `read()` behavior, and it 
can also make the new projection test flaky when the plan contains single-file 
groups.
   
   Should we preserve the same reorder logic here so that the data-evolution 
raw path matches the normal read path?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to