JingsongLi commented on code in PR #193:
URL: https://github.com/apache/paimon-rust/pull/193#discussion_r3027765903
##########
crates/paimon/src/arrow/reader.rs:
##########
@@ -184,6 +188,259 @@ impl ArrowReader {
}
.boxed())
}
+
+ /// Read data files in data evolution mode, merging columns from files
that share the same row ID range.
+ ///
+ /// Each DataSplit contains files grouped by `first_row_id`. Files within
a split may contain
+ /// different columns for the same logical rows. This method reads each
file and merges them
+ /// column-wise, respecting `max_sequence_number` for conflict resolution.
+ ///
+ /// `table_fields` is the full table schema fields, used to determine
which columns each file
+ /// provides when `write_cols` is not set.
+ pub fn read_data_evolution(
+ self,
+ data_splits: &[DataSplit],
+ table_fields: &[DataField],
+ ) -> crate::Result<ArrowRecordBatchStream> {
+ let file_io = self.file_io.clone();
+ let batch_size = self.batch_size;
+ let splits: Vec<DataSplit> = data_splits.to_vec();
+ let read_type = self.read_type;
+ let table_field_names: Vec<String> =
+ table_fields.iter().map(|f| f.name().to_string()).collect();
+ let projected_column_names: Vec<String> = read_type
+ .iter()
+ .map(|field| field.name().to_string())
+ .collect();
+
+ Ok(try_stream! {
+ for split in &splits {
+ if split.raw_convertible() || split.data_files().len() == 1 {
+ // Single file or raw convertible — read normally.
+ for file_meta in split.data_files() {
+ let batches = read_single_file(
+ &file_io, split, file_meta,
&projected_column_names, batch_size, None,
+ ).await?;
+ for batch in batches {
+ yield batch;
+ }
+ }
+ } else {
+ // Multiple files need column-wise merge.
+ let merged_batches = merge_files_by_columns(
+ &file_io,
+ split,
+ &projected_column_names,
+ &table_field_names,
+ batch_size,
+ ).await?;
+ for batch in merged_batches {
+ yield batch;
+ }
+ }
+ }
+ }
+ .boxed())
+ }
+}
+
+/// Read a single parquet file from a split, returning all batches.
+/// Optionally applies a deletion vector.
+async fn read_single_file(
+ file_io: &FileIO,
+ split: &DataSplit,
+ file_meta: &DataFileMeta,
+ projected_column_names: &[String],
+ batch_size: Option<usize>,
+ dv: Option<&DeletionVector>,
+) -> crate::Result<Vec<RecordBatch>> {
+ let path_to_read = split.data_file_path(file_meta);
+ if !path_to_read.to_ascii_lowercase().ends_with(".parquet") {
+ return Err(Error::Unsupported {
+ message: format!(
+ "unsupported file format: only .parquet is supported, got:
{path_to_read}"
+ ),
+ });
+ }
+
+ let parquet_file = file_io.new_input(&path_to_read)?;
+ let (parquet_metadata, parquet_reader) =
+ try_join!(parquet_file.metadata(), parquet_file.reader())?;
+ let arrow_file_reader = ArrowFileReader::new(parquet_metadata,
parquet_reader);
+
+ let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
+
+ // Only project columns that exist in this file.
+ let parquet_schema = batch_stream_builder.parquet_schema().clone();
+ let file_column_names: Vec<&str> = parquet_schema.columns().iter().map(|c|
c.name()).collect();
+ let available_columns: Vec<&str> = projected_column_names
+ .iter()
+ .filter(|name| file_column_names.contains(&name.as_str()))
+ .map(String::as_str)
+ .collect();
+
+ if available_columns.is_empty() {
+ return Ok(Vec::new());
+ }
+
+ let mask = ProjectionMask::columns(&parquet_schema,
available_columns.iter().copied());
+ batch_stream_builder = batch_stream_builder.with_projection(mask);
+
+ if let Some(dv) = dv {
+ if !dv.is_empty() {
+ let row_selection =
+
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
+ batch_stream_builder =
batch_stream_builder.with_row_selection(row_selection);
+ }
+ }
+ if let Some(size) = batch_size {
+ batch_stream_builder = batch_stream_builder.with_batch_size(size);
+ }
+
+ let mut batch_stream = batch_stream_builder.build()?;
+ let mut batches = Vec::new();
+ while let Some(batch) = batch_stream.next().await {
+ batches.push(batch?);
+ }
+ Ok(batches)
+}
+
+/// Merge multiple files column-wise for data evolution.
+///
+/// All files in the split share the same `first_row_id` and `row_count`.
+/// Each file contributes a subset of columns. When multiple files provide the
same column,
+/// the file with the higher `max_sequence_number` wins.
+async fn merge_files_by_columns(
+ file_io: &FileIO,
+ split: &DataSplit,
+ projected_column_names: &[String],
+ table_field_names: &[String],
+ batch_size: Option<usize>,
+) -> crate::Result<Vec<RecordBatch>> {
+ let data_files = split.data_files();
+ if data_files.is_empty() {
+ return Ok(Vec::new());
+ }
+
+ // Determine which columns each file provides and resolve conflicts by
max_sequence_number.
+ // column_name -> (file_index, max_sequence_number)
+ let mut column_source: HashMap<String, (usize, i64)> = HashMap::new();
+
+ for (file_idx, file_meta) in data_files.iter().enumerate() {
+ let file_columns: Vec<String> = if let Some(ref wc) =
file_meta.write_cols {
Review Comment:
https://github.com/apache/paimon-rust/pull/193/changes/81665a180b6fd0d803fb2b6ea89951c47f8f9311
You mean these three lines code? We should remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]