littlecoder04 commented on code in PR #241: URL: https://github.com/apache/paimon-rust/pull/241#discussion_r3080644921
########## crates/paimon/src/table/row_id_update_write.rs: ########## @@ -0,0 +1,693 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Row-ID-based update writer for data evolution tables. +//! +//! [`RowIdUpdateWriter`] accepts rows to update (identified by `_ROW_ID`) along with +//! new column values, then handles file metadata lookup, row grouping, +//! reading original columns, applying updates, and writing partial-column files. +//! +//! The writer does NOT commit — it returns `Vec<CommitMessage>` that the caller +//! passes to [`TableCommit`](super::TableCommit). +//! This separation allows callers to compose multiple operations into a single commit, +//! similar to Iceberg's Transaction/Action pattern. + +use crate::io::FileIO; +use crate::spec::{BinaryRow, CoreOptions, DataFileMeta, PartitionComputer}; +use crate::table::commit_message::CommitMessage; +use crate::table::data_file_writer::DataFileWriter; +use crate::table::DataSplitBuilder; +use crate::table::Table; +use crate::Result; +use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch}; +use arrow_select::concat::concat_batches; +use arrow_select::interleave::interleave; +use futures::TryStreamExt; +use std::collections::HashMap; + +/// Engine-agnostic writer for partial-column updates via `_ROW_ID`. +/// +/// Usage: +/// 1. Create via [`RowIdUpdateWriter::new`] (validates preconditions). +/// 2. Feed matched rows via [`add_matched_batch`](Self::add_matched_batch). +/// Each batch must contain a `_ROW_ID` (Int64) column plus the update columns. +/// 3. Call [`prepare_commit`](Self::prepare_commit) to produce `CommitMessage`s. +/// 4. Commit via [`TableCommit`](super::TableCommit) (caller's responsibility). +/// +/// The query engine (DataFusion, custom, etc.) is responsible for: +/// - Parsing the MERGE INTO SQL +/// - Executing the JOIN to find matched rows +/// - Computing new column values +/// - Passing the results as `RecordBatch`es with `_ROW_ID` + update columns +#[must_use = "writer must be used to call prepare_commit()"] +pub struct RowIdUpdateWriter { + table: Table, + update_columns: Vec<String>, + matched_batches: Vec<RecordBatch>, +} + +impl RowIdUpdateWriter { + /// Create a new writer for the given table and update columns. + /// + /// Validates: + /// - `data-evolution.enabled = true` + /// - `row-tracking.enabled = true` + /// - No primary keys + /// - Update columns don't include partition keys + pub fn new(table: &Table, update_columns: Vec<String>) -> Result<Self> { + let schema = table.schema(); + let core_options = CoreOptions::new(schema.options()); + + if !core_options.data_evolution_enabled() { + return Err(crate::Error::Unsupported { + message: + "MERGE INTO is only supported for tables with 'data-evolution.enabled' = 'true'" + .to_string(), + }); + } + if !core_options.row_tracking_enabled() { + return Err(crate::Error::Unsupported { + message: "MERGE INTO requires 'row-tracking.enabled' = 'true'".to_string(), + }); + } + if !schema.primary_keys().is_empty() { + return Err(crate::Error::Unsupported { + message: "MERGE INTO on data evolution tables does not support primary keys" + .to_string(), + }); + } + + let partition_keys = schema.partition_keys(); + for col in &update_columns { + if partition_keys.contains(col) { + return Err(crate::Error::Unsupported { + message: format!("Cannot update partition column '{col}' in MERGE INTO"), + }); + } + } + + Ok(Self { + table: table.clone(), + update_columns, + matched_batches: Vec::new(), + }) + } + + /// Add a batch of matched rows. + /// + /// The batch must contain: + /// - A `_ROW_ID` column (Int64) identifying which rows to update + /// - One column for each entry in `update_columns` with the new values + pub fn add_matched_batch(&mut self, batch: RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + // Validate _ROW_ID column exists + if batch.column_by_name("_ROW_ID").is_none() { + return Err(crate::Error::DataInvalid { + message: "Matched batch must contain a '_ROW_ID' column".to_string(), + source: None, + }); + } + + self.matched_batches.push(batch); + Ok(()) + } + + /// Scan file metadata, group matched rows by file, read originals, + /// apply updates, and write partial-column files. + /// + /// Returns `CommitMessage`s for the caller to commit via [`TableCommit`](super::TableCommit). + #[must_use = "commit messages must be passed to TableCommit"] + pub async fn prepare_commit(self) -> Result<Vec<CommitMessage>> { + let total_matched: usize = self.matched_batches.iter().map(|b| b.num_rows()).sum(); + if total_matched == 0 { + return Ok(Vec::new()); + } + + // 1. Scan file metadata and build row_id -> file index + let scan = self.table.new_read_builder().new_scan(); + let plan = scan.plan().await?; + + let mut file_index: Vec<FileRowRange> = Vec::new(); + for split in plan.splits() { + let partition_bytes = split.partition().to_serialized_bytes(); + let bucket = split.bucket(); + let bucket_path = split.bucket_path().to_string(); + let snapshot_id = split.snapshot_id(); + let total_buckets = split.total_buckets(); + for file in split.data_files() { + if let Some(first_row_id) = file.first_row_id { + file_index.push(FileRowRange { + first_row_id, + last_row_id: first_row_id + file.row_count - 1, + partition: partition_bytes.clone(), + bucket, + bucket_path: bucket_path.clone(), + snapshot_id, + total_buckets, + file: file.clone(), + }); + } + } + } + file_index.sort_by_key(|f| f.first_row_id); + + if file_index.is_empty() { + return Err(crate::Error::DataInvalid { + message: "No files with row tracking found in target table".to_string(), + source: None, + }); + } + + // 2. Group matched rows by their owning file + let mut file_matches: HashMap<usize, Vec<MatchedRow>> = HashMap::new(); + + for (batch_idx, batch) in self.matched_batches.iter().enumerate() { + let row_id_col = batch + .column_by_name("_ROW_ID") + .unwrap() + .as_any() + .downcast_ref::<Int64Array>() + .ok_or_else(|| crate::Error::DataInvalid { + message: "_ROW_ID column must be Int64".to_string(), + source: None, + })?; + + for row_idx in 0..batch.num_rows() { + let row_id = row_id_col.value(row_idx); + let (file_pos, file_range) = + find_owning_file(&file_index, row_id).ok_or_else(|| { + crate::Error::DataInvalid { + message: format!("No file found for _ROW_ID {row_id}"), + source: None, + } + })?; + + let offset = (row_id - file_range.first_row_id) as usize; + file_matches.entry(file_pos).or_default().push(MatchedRow { + offset, + batch_idx, + row_idx, + }); + } + } + + // 3. For each affected file: read original columns, apply updates, write partial files + let mut writer = PartialColumnsWriter::new(&self.table, self.update_columns.clone())?; + + for (&file_pos, matched_rows) in &file_matches { + let file_range = &file_index[file_pos]; + let file = &file_range.file; + let first_row_id = file.first_row_id.unwrap(); + let row_count = file.row_count as usize; + + // Read original columns from this file + let col_refs: Vec<&str> = self.update_columns.iter().map(|s| s.as_str()).collect(); + let mut rb = self.table.new_read_builder(); + rb.with_projection(&col_refs); + let read = rb.new_read()?; + + let split = DataSplitBuilder::new() + .with_snapshot(file_range.snapshot_id) + .with_partition(BinaryRow::from_serialized_bytes(&file_range.partition)?) + .with_bucket(file_range.bucket) + .with_bucket_path(file_range.bucket_path.clone()) + .with_total_buckets(file_range.total_buckets) + .with_data_files(vec![file.clone()]) Review Comment: At this point, `_ROW_ID` has already been narrowed to a single `DataFileMeta`, and this read only loads that file back. However, the current logical row range may span multiple files with the same `first_row_id` (base file + partial-column files). Upstream Java/Python paths seem to read the whole `first_row_id` group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
