QuakeWang commented on code in PR #193:
URL: https://github.com/apache/paimon-rust/pull/193#discussion_r3027913594


##########
crates/paimon/src/arrow/reader.rs:
##########
@@ -184,6 +188,259 @@ impl ArrowReader {
         }
             .boxed())
     }
+
+    /// Read data files in data evolution mode, merging columns from files 
that share the same row ID range.
+    ///
+    /// Each DataSplit contains files grouped by `first_row_id`. Files within 
a split may contain
+    /// different columns for the same logical rows. This method reads each 
file and merges them
+    /// column-wise, respecting `max_sequence_number` for conflict resolution.
+    ///
+    /// `table_fields` is the full table schema fields, used to determine 
which columns each file
+    /// provides when `write_cols` is not set.
+    pub fn read_data_evolution(
+        self,
+        data_splits: &[DataSplit],
+        table_fields: &[DataField],
+    ) -> crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size;
+        let splits: Vec<DataSplit> = data_splits.to_vec();
+        let read_type = self.read_type;
+        let table_field_names: Vec<String> =
+            table_fields.iter().map(|f| f.name().to_string()).collect();
+        let projected_column_names: Vec<String> = read_type
+            .iter()
+            .map(|field| field.name().to_string())
+            .collect();
+
+        Ok(try_stream! {
+            for split in &splits {
+                if split.raw_convertible() || split.data_files().len() == 1 {
+                    // Single file or raw convertible — read normally.
+                    for file_meta in split.data_files() {
+                        let batches = read_single_file(
+                            &file_io, split, file_meta, 
&projected_column_names, batch_size, None,
+                        ).await?;
+                        for batch in batches {
+                            yield batch;
+                        }
+                    }
+                } else {
+                    // Multiple files need column-wise merge.
+                    let merged_batches = merge_files_by_columns(
+                        &file_io,
+                        split,
+                        &projected_column_names,
+                        &table_field_names,
+                        batch_size,
+                    ).await?;
+                    for batch in merged_batches {
+                        yield batch;
+                    }
+                }
+            }
+        }
+        .boxed())
+    }
+}
+
+/// Read a single parquet file from a split, returning all batches.
+/// Optionally applies a deletion vector.
+async fn read_single_file(
+    file_io: &FileIO,
+    split: &DataSplit,
+    file_meta: &DataFileMeta,
+    projected_column_names: &[String],
+    batch_size: Option<usize>,
+    dv: Option<&DeletionVector>,
+) -> crate::Result<Vec<RecordBatch>> {
+    let path_to_read = split.data_file_path(file_meta);
+    if !path_to_read.to_ascii_lowercase().ends_with(".parquet") {
+        return Err(Error::Unsupported {
+            message: format!(
+                "unsupported file format: only .parquet is supported, got: 
{path_to_read}"
+            ),
+        });
+    }
+
+    let parquet_file = file_io.new_input(&path_to_read)?;
+    let (parquet_metadata, parquet_reader) =
+        try_join!(parquet_file.metadata(), parquet_file.reader())?;
+    let arrow_file_reader = ArrowFileReader::new(parquet_metadata, 
parquet_reader);
+
+    let mut batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
+
+    // Only project columns that exist in this file.
+    let parquet_schema = batch_stream_builder.parquet_schema().clone();
+    let file_column_names: Vec<&str> = parquet_schema.columns().iter().map(|c| 
c.name()).collect();
+    let available_columns: Vec<&str> = projected_column_names
+        .iter()
+        .filter(|name| file_column_names.contains(&name.as_str()))
+        .map(String::as_str)
+        .collect();
+
+    if available_columns.is_empty() {
+        return Ok(Vec::new());
+    }
+
+    let mask = ProjectionMask::columns(&parquet_schema, 
available_columns.iter().copied());
+    batch_stream_builder = batch_stream_builder.with_projection(mask);
+
+    if let Some(dv) = dv {
+        if !dv.is_empty() {
+            let row_selection =
+                
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
+            batch_stream_builder = 
batch_stream_builder.with_row_selection(row_selection);
+        }
+    }
+    if let Some(size) = batch_size {
+        batch_stream_builder = batch_stream_builder.with_batch_size(size);
+    }
+
+    let mut batch_stream = batch_stream_builder.build()?;
+    let mut batches = Vec::new();
+    while let Some(batch) = batch_stream.next().await {
+        batches.push(batch?);
+    }
+    Ok(batches)
+}
+
+/// Merge multiple files column-wise for data evolution.
+///
+/// All files in the split share the same `first_row_id` and `row_count`.
+/// Each file contributes a subset of columns. When multiple files provide the 
same column,
+/// the file with the higher `max_sequence_number` wins.
+async fn merge_files_by_columns(
+    file_io: &FileIO,
+    split: &DataSplit,
+    projected_column_names: &[String],
+    table_field_names: &[String],
+    batch_size: Option<usize>,
+) -> crate::Result<Vec<RecordBatch>> {
+    let data_files = split.data_files();
+    if data_files.is_empty() {
+        return Ok(Vec::new());
+    }
+
+    // Determine which columns each file provides and resolve conflicts by 
max_sequence_number.
+    // column_name -> (file_index, max_sequence_number)
+    let mut column_source: HashMap<String, (usize, i64)> = HashMap::new();
+
+    for (file_idx, file_meta) in data_files.iter().enumerate() {
+        let file_columns: Vec<String> = if let Some(ref wc) = 
file_meta.write_cols {

Review Comment:
   Yes, those three lines are part of the issue, so removing them makes sense.
   
   I just want to make sure this is sufficient: for the add-column case, if the 
projected column does not physically exist in the old files, we should still 
preserve the row count and fill NULL, instead of dropping rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to