DAlperin commented on code in PR #2203:
URL: https://github.com/apache/iceberg-rust/pull/2203#discussion_r2941114573


##########
crates/iceberg/src/transaction/row_delta.rs:
##########
@@ -0,0 +1,491 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use uuid::Uuid;
+
+use crate::error::Result;
+use crate::spec::{DataFile, ManifestEntry, ManifestFile, ManifestStatus, 
Operation};
+use crate::table::Table;
+use crate::transaction::snapshot::{
+    DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
+};
+use crate::transaction::{ActionCommit, TransactionAction};
+
+/// RowDeltaAction handles both data file additions and deletions in a single 
snapshot.
+/// This is the core transaction type for MERGE, UPDATE, DELETE operations.
+///
+/// Corresponds to `org.apache.iceberg.RowDelta` in the Java implementation.
+///
+/// # Copy-on-Write (COW) Strategy
+///
+/// For row-level modifications:
+/// 1. Read target data files that contain rows to be modified
+/// 2. Apply modifications (UPDATE/DELETE logic)
+/// 3. Write modified rows to new data files via `add_data_files()`
+/// 4. Mark original files as deleted via `remove_data_files()`
+///
+/// For inserts (NOT MATCHED in MERGE):
+/// 1. Write new rows to data files
+/// 2. Add files via `add_data_files()`
+///
+/// # Future: Merge-on-Read (MOR) Strategy
+///
+/// The `add_delete_files()` method is reserved for future MOR support, which 
uses
+/// delete files instead of rewriting data files.
+pub struct RowDeltaAction {
+    /// New data files to add (for inserts or rewritten files in COW mode)
+    added_data_files: Vec<DataFile>,
+    /// Data files to mark as deleted (for COW mode when rewriting files)
+    removed_data_files: Vec<DataFile>,
+    /// Delete files to add (reserved for future MOR mode support)
+    added_delete_files: Vec<DataFile>,
+    /// Optional commit UUID for manifest file naming
+    commit_uuid: Option<Uuid>,
+    /// Additional properties to add to snapshot summary
+    snapshot_properties: HashMap<String, String>,
+    /// Optional starting snapshot ID for conflict detection
+    starting_snapshot_id: Option<i64>,
+}
+
+impl RowDeltaAction {
+    pub(crate) fn new() -> Self {
+        Self {
+            added_data_files: vec![],
+            removed_data_files: vec![],
+            added_delete_files: vec![],
+            commit_uuid: None,
+            snapshot_properties: HashMap::default(),
+            starting_snapshot_id: None,
+        }
+    }
+
+    /// Add new data files to the snapshot.
+    ///
+    /// Used for:
+    /// - New rows from INSERT operations
+    /// - Rewritten data files in COW mode (after applying UPDATE/DELETE)
+    ///
+    /// Corresponds to `addRows(DataFile)` in Java implementation.
+    pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = 
DataFile>) -> Self {
+        self.added_data_files.extend(data_files);
+        self
+    }
+
+    /// Mark data files as deleted in the snapshot.
+    ///
+    /// Used in COW mode to mark original files as deleted when they've been 
rewritten
+    /// with modifications.
+    ///
+    /// Corresponds to `removeRows(DataFile)` in Java implementation.
+    pub fn remove_data_files(mut self, data_files: impl IntoIterator<Item = 
DataFile>) -> Self {
+        self.removed_data_files.extend(data_files);
+        self
+    }
+
+    /// Add delete files to the snapshot (reserved for future MOR mode).
+    ///
+    /// Corresponds to `addDeletes(DeleteFile)` in Java implementation.
+    ///
+    /// # Note
+    ///
+    /// This is not yet implemented and is reserved for future Merge-on-Read 
(MOR)
+    /// optimization where delete files are used instead of rewriting data 
files.
+    pub fn add_delete_files(mut self, delete_files: impl IntoIterator<Item = 
DataFile>) -> Self {
+        self.added_delete_files.extend(delete_files);
+        self
+    }
+
+    /// Set commit UUID for the snapshot.
+    pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self {
+        self.commit_uuid = Some(commit_uuid);
+        self
+    }
+
+    /// Set snapshot summary properties.
+    pub fn set_snapshot_properties(mut self, snapshot_properties: 
HashMap<String, String>) -> Self {
+        self.snapshot_properties = snapshot_properties;
+        self
+    }
+
+    /// Validate that the operation is applied on top of a specific snapshot.
+    ///
+    /// This can be used for conflict detection in concurrent modification 
scenarios.
+    ///
+    /// Corresponds to `validateFromSnapshot(long snapshotId)` in Java 
implementation.
+    pub fn validate_from_snapshot(mut self, snapshot_id: i64) -> Self {
+        self.starting_snapshot_id = Some(snapshot_id);
+        self
+    }
+}
+
+#[async_trait]
+impl TransactionAction for RowDeltaAction {
+    async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
+        // Validate starting snapshot if specified
+        if let Some(expected_snapshot_id) = self.starting_snapshot_id
+            && table.metadata().current_snapshot_id() != 
Some(expected_snapshot_id)
+        {
+            return Err(crate::Error::new(
+                crate::ErrorKind::DataInvalid,
+                format!(
+                    "Cannot commit RowDelta based on stale snapshot. Expected: 
{}, Current: {:?}",
+                    expected_snapshot_id,
+                    table.metadata().current_snapshot_id()
+                ),
+            ));
+        }
+
+        let snapshot_producer = SnapshotProducer::new(
+            table,
+            self.commit_uuid.unwrap_or_else(Uuid::now_v7),

Review Comment:
   It strikes me that lexicographic sorting of snapshot IDs is a positive trait 
and we should prefer it where possible unless there is a compelling reason 
otherwise but I defer to the mantainers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to