QuakeWang commented on code in PR #193:
URL: https://github.com/apache/paimon-rust/pull/193#discussion_r3027687148


##########
crates/paimon/src/arrow/reader.rs:
##########
@@ -184,6 +188,259 @@ impl ArrowReader {
         }
             .boxed())
     }
+
+    /// Read data files in data evolution mode, merging columns from files 
that share the same row ID range.
+    ///
+    /// Each DataSplit contains files grouped by `first_row_id`. Files within 
a split may contain
+    /// different columns for the same logical rows. This method reads each 
file and merges them
+    /// column-wise, respecting `max_sequence_number` for conflict resolution.
+    ///
+    /// `table_fields` is the full table schema fields, used to determine 
which columns each file
+    /// provides when `write_cols` is not set.
+    pub fn read_data_evolution(
+        self,
+        data_splits: &[DataSplit],
+        table_fields: &[DataField],
+    ) -> crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size;
+        let splits: Vec<DataSplit> = data_splits.to_vec();
+        let read_type = self.read_type;
+        let table_field_names: Vec<String> =
+            table_fields.iter().map(|f| f.name().to_string()).collect();
+        let projected_column_names: Vec<String> = read_type
+            .iter()
+            .map(|field| field.name().to_string())
+            .collect();
+
+        Ok(try_stream! {
+            for split in &splits {
+                if split.raw_convertible() || split.data_files().len() == 1 {
+                    // Single file or raw convertible — read normally.
+                    for file_meta in split.data_files() {
+                        let batches = read_single_file(
+                            &file_io, split, file_meta, 
&projected_column_names, batch_size, None,
+                        ).await?;
+                        for batch in batches {
+                            yield batch;
+                        }
+                    }
+                } else {
+                    // Multiple files need column-wise merge.
+                    let merged_batches = merge_files_by_columns(
+                        &file_io,
+                        split,
+                        &projected_column_names,
+                        &table_field_names,
+                        batch_size,
+                    ).await?;
+                    for batch in merged_batches {
+                        yield batch;
+                    }
+                }
+            }
+        }
+        .boxed())
+    }
+}
+
+/// Read a single parquet file from a split, returning all batches.
+/// Optionally applies a deletion vector.
+async fn read_single_file(
+    file_io: &FileIO,
+    split: &DataSplit,
+    file_meta: &DataFileMeta,
+    projected_column_names: &[String],
+    batch_size: Option<usize>,
+    dv: Option<&DeletionVector>,
+) -> crate::Result<Vec<RecordBatch>> {
+    let path_to_read = split.data_file_path(file_meta);
+    if !path_to_read.to_ascii_lowercase().ends_with(".parquet") {
+        return Err(Error::Unsupported {
+            message: format!(
+                "unsupported file format: only .parquet is supported, got: 
{path_to_read}"
+            ),
+        });
+    }
+
+    let parquet_file = file_io.new_input(&path_to_read)?;
+    let (parquet_metadata, parquet_reader) =
+        try_join!(parquet_file.metadata(), parquet_file.reader())?;
+    let arrow_file_reader = ArrowFileReader::new(parquet_metadata, 
parquet_reader);
+
+    let mut batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
+
+    // Only project columns that exist in this file.
+    let parquet_schema = batch_stream_builder.parquet_schema().clone();
+    let file_column_names: Vec<&str> = parquet_schema.columns().iter().map(|c| 
c.name()).collect();
+    let available_columns: Vec<&str> = projected_column_names
+        .iter()
+        .filter(|name| file_column_names.contains(&name.as_str()))
+        .map(String::as_str)
+        .collect();
+
+    if available_columns.is_empty() {
+        return Ok(Vec::new());
+    }
+
+    let mask = ProjectionMask::columns(&parquet_schema, 
available_columns.iter().copied());
+    batch_stream_builder = batch_stream_builder.with_projection(mask);
+
+    if let Some(dv) = dv {
+        if !dv.is_empty() {
+            let row_selection =
+                
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
+            batch_stream_builder = 
batch_stream_builder.with_row_selection(row_selection);
+        }
+    }
+    if let Some(size) = batch_size {
+        batch_stream_builder = batch_stream_builder.with_batch_size(size);
+    }
+
+    let mut batch_stream = batch_stream_builder.build()?;
+    let mut batches = Vec::new();
+    while let Some(batch) = batch_stream.next().await {
+        batches.push(batch?);
+    }
+    Ok(batches)
+}
+
+/// Merge multiple files column-wise for data evolution.
+///
+/// All files in the split share the same `first_row_id` and `row_count`.
+/// Each file contributes a subset of columns. When multiple files provide the 
same column,
+/// the file with the higher `max_sequence_number` wins.
+async fn merge_files_by_columns(
+    file_io: &FileIO,
+    split: &DataSplit,
+    projected_column_names: &[String],
+    table_field_names: &[String],
+    batch_size: Option<usize>,
+) -> crate::Result<Vec<RecordBatch>> {
+    let data_files = split.data_files();
+    if data_files.is_empty() {
+        return Ok(Vec::new());
+    }
+
+    // Determine which columns each file provides and resolve conflicts by 
max_sequence_number.
+    // column_name -> (file_index, max_sequence_number)
+    let mut column_source: HashMap<String, (usize, i64)> = HashMap::new();
+
+    for (file_idx, file_meta) in data_files.iter().enumerate() {
+        let file_columns: Vec<String> = if let Some(ref wc) = 
file_meta.write_cols {

Review Comment:
   > The old file can be treaded as `(id, name, age)`, the age is null. 
Actually, this is a schema evolution.
   > 
   > I don't want to introduce the reading of old schema files because it 
should address issues such as column type changes.
   
   @JingsongLi  I get your point. I agree we do not necessarily need to 
introduce historical schema reading here, especially if that expands the scope 
to type evolution.
   
   My concern is mainly that, even under the “current schema + NULL for missing 
columns” semantics, the current fallback may still behave incorrectly in some 
cases. For example, when projecting only a newly added column, I think this 
path may return 0 rows instead of preserving the row count and filling NULLs.
   
   So my point is less about requiring old-schema reads, and more about whether 
the current fallback already implements the expected schema-evolution behavior 
correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to