vustef commented on code in PR #2219: URL: https://github.com/apache/iceberg-rust/pull/2219#discussion_r2904816875
########## crates/iceberg/src/transaction/row_delta.rs: ########## Review Comment: Given that his change is bigger, and smaller changes are more manageable, my 2c is to help the other PR (https://github.com/apache/iceberg-rust/pull/2203) by reviewing it, and then potentially building upon it. ########## crates/iceberg/src/writer/combined_writer/mod.rs: ########## Review Comment: apparently this file is missing a license header ########## crates/iceberg/src/writer/base_writer/position_delete_writer.rs: ########## Review Comment: Can you please run `cargo fmt --all` with a corresponding cargo version ########## crates/catalog/rest/src/catalog.rs: ########## @@ -716,6 +716,13 @@ impl Catalog for RestCatalog { }) .build()?; + println!( Review Comment: I assume you used this for some debugging and it should be removed? ########## crates/iceberg/src/transaction/row_delta.rs: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Row delta transaction for encoding row-level changes to Iceberg tables. +//! +//! This module provides the `RowDeltaAction` which enables atomic application of row-level +//! modifications including inserts, updates, and deletes. It supports both position deletes +//! (for specific row locations) and equality deletes (for rows matching equality conditions). +//! +//! This is the appropriate transaction type for CDC (Change Data Capture) ingestion, +//! upsert operations, and row-level deletions. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{DataContentType, DataFile, ManifestEntry, ManifestFile, Operation}; +use crate::table::Table; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind}; + +/// RowDeltaAction is a transaction action for encoding row-level changes to a table. +/// +/// This action supports: +/// - Adding new data files +/// - Adding delete files (both position and equality deletes) +/// +/// This is the appropriate action to use for: +/// - CDC (Change Data Capture) ingestion +/// - Upsert operations +/// - Row-level deletions +/// +/// # Example +/// ```ignore +/// use iceberg::transaction::Transaction; +/// +/// let tx = Transaction::new(&table); +/// let action = tx.row_delta() +/// .add_data_files(new_data_files) +/// .add_delete_files(equality_delete_files); +/// let tx = action.apply(tx).unwrap(); +/// let table = tx.commit(&catalog).await.unwrap(); +/// ``` +pub struct RowDeltaAction { + check_duplicate: bool, + // below are properties used to create SnapshotProducer when commit + commit_uuid: Option<Uuid>, + key_metadata: Option<Vec<u8>>, + snapshot_properties: HashMap<String, String>, + added_data_files: Vec<DataFile>, + added_delete_files: Vec<DataFile>, +} + +impl RowDeltaAction { + pub(crate) fn new() -> Self { + Self { + check_duplicate: true, + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + added_data_files: vec![], + added_delete_files: vec![], + } + } + + /// Set whether to check duplicate files + pub fn with_check_duplicate(mut self, v: bool) -> Self { + self.check_duplicate = v; + self + } + + /// Add data files to the snapshot. + pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self { + self.added_data_files.extend(data_files); + self + } + + /// Add delete files to the snapshot. + /// + /// Delete files can be either position deletes or equality deletes. + /// The content type of each file will be validated. + pub fn add_delete_files(mut self, delete_files: impl IntoIterator<Item = DataFile>) -> Self { + self.added_delete_files.extend(delete_files); + self + } + + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec<u8>) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap<String, String>) -> Self { + self.snapshot_properties = snapshot_properties; + self + } + + /// Validate that delete files have appropriate content types + fn validate_delete_files(delete_files: &[DataFile]) -> Result<()> { + for delete_file in delete_files { + match delete_file.content_type() { + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + // Valid delete file types + } + DataContentType::Data => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "File {} has content type Data but was added as a delete file. Use add_data_files() instead.", + delete_file.file_path() + ), + )); + } + } + + // Additional validation for equality deletes + if delete_file.content_type() == DataContentType::EqualityDeletes + && delete_file.equality_ids().is_none() + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Equality delete file {} must have equality_ids set", + delete_file.file_path() + ), + )); + } + } + Ok(()) + } +} + +#[async_trait] +impl TransactionAction for RowDeltaAction { + async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> { + // Validate delete files have correct content types + Self::validate_delete_files(&self.added_delete_files)?; + + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + self.added_delete_files.clone(), + ); + + // Validate added data files (partition specs, etc.) + if !self.added_data_files.is_empty() { + snapshot_producer.validate_added_data_files(&self.added_data_files)?; + } + + // Validate added delete files (partition specs, etc.) + if !self.added_delete_files.is_empty() { + snapshot_producer.validate_added_data_files(&self.added_delete_files)?; + } + + // Check duplicate files + if self.check_duplicate { + snapshot_producer.validate_duplicate_files().await?; + } + + snapshot_producer + .commit(RowDeltaOperation, DefaultManifestProcess) + .await + } +} + +struct RowDeltaOperation; Review Comment: The action in the other PR supports deleting data files (I assume for copy-on-write updates and for data file deletes). It'd be good to have that. What do you think? ########## crates/iceberg/src/writer/base_writer/position_delete_writer.rs: ########## @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Position delete writer for encoding row-level deletions in Iceberg tables. +//! +//! Position deletes identify rows to delete by their file path and row position within that file. +//! This is more efficient than equality deletes when the exact location of rows to delete is known, +//! such as during CDC processing or when tracking rows written within the same transaction. + +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::spec::{DataFile, PartitionKey, Struct}; +use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator}; +use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder}; +use crate::writer::file_writer::FileWriterBuilder; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Builder for `PositionDeleteWriter`. +#[derive(Debug)] +pub struct PositionDeleteFileWriterBuilder< + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +> { + inner: RollingFileWriterBuilder<B, L, F>, + config: PositionDeleteWriterConfig, +} + +impl<B, L, F> PositionDeleteFileWriterBuilder<B, L, F> +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + /// Create a new `PositionDeleteFileWriterBuilder` using a `RollingFileWriterBuilder`. + pub fn new( + inner: RollingFileWriterBuilder<B, L, F>, + config: PositionDeleteWriterConfig, + ) -> Self { + Self { inner, config } + } +} + +/// Config for `PositionDeleteWriter`. +#[derive(Clone, Debug)] +pub struct PositionDeleteWriterConfig { + partition_value: Struct, + partition_spec_id: i32, + referenced_data_file: Option<String>, +} + +impl PositionDeleteWriterConfig { + /// Create a new `PositionDeleteWriterConfig`. + /// + /// # Arguments + /// * `partition_value` - The partition value for the delete file + /// * `partition_spec_id` - The partition spec ID + /// * `referenced_data_file` - Optional path to the data file being deleted from + pub fn new( + partition_value: Option<Struct>, + partition_spec_id: i32, + referenced_data_file: Option<String>, + ) -> Self { + Self { + partition_value: partition_value.unwrap_or(Struct::empty()), + partition_spec_id, + referenced_data_file, + } + } + + /// Returns the Arrow schema for position delete files. + /// + /// Position delete files have a fixed schema: + /// - file_path: string (field id 2147483546) + /// - pos: long (field id 2147483545) + pub fn arrow_schema() -> ArrowSchemaRef { + Arc::new(ArrowSchema::new(vec![ + Field::new("file_path", DataType::Utf8, false).with_metadata( Review Comment: We have consts for this: `RESERVED_COL_NAME_DELETE_FILE_PATH`. And `RESERVED_FIELD_ID_DELETE_FILE_PATH` for field ID. Or rather, whole `FILE_PATH_FIELD` can be reused perhaps. Same for `pos` ########## crates/iceberg/src/writer/combined_writer/delta_writer.rs: ########## Review Comment: I'm curious what is @liurenjie1024's opinion on where this thing should live. This is higher level API than e.g. position delete writer or equality delete writer. It is not the only possible choice of combining these (e.g. some might want to switch to copy-on-write if more than 10% of file is changed, alike to Snowflake's heuristic here: https://docs.snowflake.com/en/user-guide/tables-iceberg-manage#usage-notes-for-deletion-vectors). Therefore, it might not generalize well, and perhaps it should either be part of some other crate, or perhaps structured in a way so that there's a factory of different higher-level writers based on different strategies, possibly extensible outside of iceberg-rust. I still think there's value in this being in this repository, otherwise other people will have a hard time discovering and reusing functionality that they like. Another thing to note is that if we're writing Iceberg V3 in future, we shouldn't write positional delete files but delete vectors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
