This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new bf149d3f refactor: Split transaction module (#1080)
bf149d3f is described below

commit bf149d3f7713b50cc7c36e22bafdc4f84de8abd9
Author: Jonathan Chen <[email protected]>
AuthorDate: Thu Mar 13 21:39:16 2025 -0400

    refactor: Split transaction module (#1080)
    
    ## Which issue does this PR close?
    
    - Closes #980 .
    
    ## What changes are included in this PR?
    
    Split transactions module
---
 crates/iceberg/src/transaction.rs            | 1046 --------------------------
 crates/iceberg/src/transaction/append.rs     |  373 +++++++++
 crates/iceberg/src/transaction/mod.rs        |  326 ++++++++
 crates/iceberg/src/transaction/snapshot.rs   |  309 ++++++++
 crates/iceberg/src/transaction/sort_order.rs |  132 ++++
 5 files changed, 1140 insertions(+), 1046 deletions(-)

diff --git a/crates/iceberg/src/transaction.rs 
b/crates/iceberg/src/transaction.rs
deleted file mode 100644
index 15d5c99a..00000000
--- a/crates/iceberg/src/transaction.rs
+++ /dev/null
@@ -1,1046 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! This module contains transaction api.
-
-use std::cmp::Ordering;
-use std::collections::{HashMap, HashSet};
-use std::future::Future;
-use std::mem::discriminant;
-use std::ops::RangeFrom;
-
-use arrow_array::StringArray;
-use futures::TryStreamExt;
-use uuid::Uuid;
-
-use crate::error::Result;
-use crate::io::OutputFile;
-use crate::spec::{
-    DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, 
ManifestListWriter,
-    ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, 
SnapshotRetention,
-    SortDirection, SortField, SortOrder, Struct, StructType, Summary, 
Transform, MAIN_BRANCH,
-};
-use crate::table::Table;
-use crate::writer::file_writer::ParquetWriter;
-use crate::TableUpdate::UpgradeFormatVersion;
-use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, 
TableUpdate};
-
-const META_ROOT_PATH: &str = "metadata";
-
-/// Table transaction.
-pub struct Transaction<'a> {
-    table: &'a Table,
-    updates: Vec<TableUpdate>,
-    requirements: Vec<TableRequirement>,
-}
-
-impl<'a> Transaction<'a> {
-    /// Creates a new transaction.
-    pub fn new(table: &'a Table) -> Self {
-        Self {
-            table,
-            updates: vec![],
-            requirements: vec![],
-        }
-    }
-
-    fn append_updates(&mut self, updates: Vec<TableUpdate>) -> Result<()> {
-        for update in &updates {
-            for up in &self.updates {
-                if discriminant(up) == discriminant(update) {
-                    return Err(Error::new(
-                        ErrorKind::DataInvalid,
-                        format!(
-                            "Cannot apply update with same type at same time: 
{:?}",
-                            update
-                        ),
-                    ));
-                }
-            }
-        }
-        self.updates.extend(updates);
-        Ok(())
-    }
-
-    fn append_requirements(&mut self, requirements: Vec<TableRequirement>) -> 
Result<()> {
-        self.requirements.extend(requirements);
-        Ok(())
-    }
-
-    /// Sets table to a new version.
-    pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> 
Result<Self> {
-        let current_version = self.table.metadata().format_version();
-        match current_version.cmp(&format_version) {
-            Ordering::Greater => {
-                return Err(Error::new(
-                    ErrorKind::DataInvalid,
-                    format!(
-                        "Cannot downgrade table version from {} to {}",
-                        current_version, format_version
-                    ),
-                ));
-            }
-            Ordering::Less => {
-                self.append_updates(vec![UpgradeFormatVersion { format_version 
}])?;
-            }
-            Ordering::Equal => {
-                // Do nothing.
-            }
-        }
-        Ok(self)
-    }
-
-    /// Update table's property.
-    pub fn set_properties(mut self, props: HashMap<String, String>) -> 
Result<Self> {
-        self.append_updates(vec![TableUpdate::SetProperties { updates: props 
}])?;
-        Ok(self)
-    }
-
-    fn generate_unique_snapshot_id(&self) -> i64 {
-        let generate_random_id = || -> i64 {
-            let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
-            let snapshot_id = (lhs ^ rhs) as i64;
-            if snapshot_id < 0 {
-                -snapshot_id
-            } else {
-                snapshot_id
-            }
-        };
-        let mut snapshot_id = generate_random_id();
-        while self
-            .table
-            .metadata()
-            .snapshots()
-            .any(|s| s.snapshot_id() == snapshot_id)
-        {
-            snapshot_id = generate_random_id();
-        }
-        snapshot_id
-    }
-
-    /// Creates a fast append action.
-    pub fn fast_append(
-        self,
-        commit_uuid: Option<Uuid>,
-        key_metadata: Vec<u8>,
-    ) -> Result<FastAppendAction<'a>> {
-        let snapshot_id = self.generate_unique_snapshot_id();
-        FastAppendAction::new(
-            self,
-            snapshot_id,
-            commit_uuid.unwrap_or_else(Uuid::now_v7),
-            key_metadata,
-            HashMap::new(),
-        )
-    }
-
-    /// Creates replace sort order action.
-    pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
-        ReplaceSortOrderAction {
-            tx: self,
-            sort_fields: vec![],
-        }
-    }
-
-    /// Remove properties in table.
-    pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
-        self.append_updates(vec![TableUpdate::RemoveProperties { removals: 
keys }])?;
-        Ok(self)
-    }
-
-    /// Commit transaction.
-    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
-        let table_commit = TableCommit::builder()
-            .ident(self.table.identifier().clone())
-            .updates(self.updates)
-            .requirements(self.requirements)
-            .build();
-
-        catalog.update_table(table_commit).await
-    }
-}
-
-/// FastAppendAction is a transaction action for fast append data files to the 
table.
-pub struct FastAppendAction<'a> {
-    snapshot_produce_action: SnapshotProduceAction<'a>,
-    check_duplicate: bool,
-}
-
-impl<'a> FastAppendAction<'a> {
-    #[allow(clippy::too_many_arguments)]
-    pub(crate) fn new(
-        tx: Transaction<'a>,
-        snapshot_id: i64,
-        commit_uuid: Uuid,
-        key_metadata: Vec<u8>,
-        snapshot_properties: HashMap<String, String>,
-    ) -> Result<Self> {
-        Ok(Self {
-            snapshot_produce_action: SnapshotProduceAction::new(
-                tx,
-                snapshot_id,
-                key_metadata,
-                commit_uuid,
-                snapshot_properties,
-            )?,
-            check_duplicate: true,
-        })
-    }
-
-    /// Set whether to check duplicate files
-    pub fn with_check_duplicate(mut self, v: bool) -> Self {
-        self.check_duplicate = v;
-        self
-    }
-
-    /// Add data files to the snapshot.
-    pub fn add_data_files(
-        &mut self,
-        data_files: impl IntoIterator<Item = DataFile>,
-    ) -> Result<&mut Self> {
-        self.snapshot_produce_action.add_data_files(data_files)?;
-        Ok(self)
-    }
-
-    /// Adds existing parquet files
-    #[allow(dead_code)]
-    async fn add_parquet_files(mut self, file_path: Vec<String>) -> 
Result<Transaction<'a>> {
-        if !self
-            .snapshot_produce_action
-            .tx
-            .table
-            .metadata()
-            .default_spec
-            .is_unpartitioned()
-        {
-            return Err(Error::new(
-                ErrorKind::FeatureUnsupported,
-                "Appending to partitioned tables is not supported",
-            ));
-        }
-
-        let table_metadata = self.snapshot_produce_action.tx.table.metadata();
-
-        let data_files = ParquetWriter::parquet_files_to_data_files(
-            self.snapshot_produce_action.tx.table.file_io(),
-            file_path,
-            table_metadata,
-        )
-        .await?;
-
-        self.add_data_files(data_files)?;
-
-        self.apply().await
-    }
-
-    /// Finished building the action and apply it to the transaction.
-    pub async fn apply(self) -> Result<Transaction<'a>> {
-        // Checks duplicate files
-        if self.check_duplicate {
-            let new_files: HashSet<&str> = self
-                .snapshot_produce_action
-                .added_data_files
-                .iter()
-                .map(|df| df.file_path.as_str())
-                .collect();
-
-            let mut manifest_stream = self
-                .snapshot_produce_action
-                .tx
-                .table
-                .inspect()
-                .manifests()
-                .scan()
-                .await?;
-            let mut referenced_files = Vec::new();
-
-            while let Some(batch) = manifest_stream.try_next().await? {
-                let file_path_array = batch
-                    .column(1)
-                    .as_any()
-                    .downcast_ref::<StringArray>()
-                    .ok_or_else(|| {
-                        Error::new(
-                            ErrorKind::DataInvalid,
-                            "Failed to downcast file_path column to 
StringArray",
-                        )
-                    })?;
-
-                for i in 0..batch.num_rows() {
-                    let file_path = file_path_array.value(i);
-                    if new_files.contains(file_path) {
-                        referenced_files.push(file_path.to_string());
-                    }
-                }
-            }
-
-            if !referenced_files.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::DataInvalid,
-                    format!(
-                        "Cannot add files that are already referenced by 
table, files: {}",
-                        referenced_files.join(", ")
-                    ),
-                ));
-            }
-        }
-
-        self.snapshot_produce_action
-            .apply(FastAppendOperation, DefaultManifestProcess)
-            .await
-    }
-}
-
-struct FastAppendOperation;
-
-impl SnapshotProduceOperation for FastAppendOperation {
-    fn operation(&self) -> Operation {
-        Operation::Append
-    }
-
-    async fn delete_entries(
-        &self,
-        _snapshot_produce: &SnapshotProduceAction<'_>,
-    ) -> Result<Vec<ManifestEntry>> {
-        Ok(vec![])
-    }
-
-    async fn existing_manifest(
-        &self,
-        snapshot_produce: &SnapshotProduceAction<'_>,
-    ) -> Result<Vec<ManifestFile>> {
-        let Some(snapshot) = 
snapshot_produce.tx.table.metadata().current_snapshot() else {
-            return Ok(vec![]);
-        };
-
-        let manifest_list = snapshot
-            .load_manifest_list(
-                snapshot_produce.tx.table.file_io(),
-                &snapshot_produce.tx.table.metadata_ref(),
-            )
-            .await?;
-
-        Ok(manifest_list
-            .entries()
-            .iter()
-            .filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
-            .cloned()
-            .collect())
-    }
-}
-
-trait SnapshotProduceOperation: Send + Sync {
-    fn operation(&self) -> Operation;
-    #[allow(unused)]
-    fn delete_entries(
-        &self,
-        snapshot_produce: &SnapshotProduceAction,
-    ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
-    fn existing_manifest(
-        &self,
-        snapshot_produce: &SnapshotProduceAction,
-    ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
-}
-
-struct DefaultManifestProcess;
-
-impl ManifestProcess for DefaultManifestProcess {
-    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile> {
-        manifests
-    }
-}
-
-trait ManifestProcess: Send + Sync {
-    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile>;
-}
-
-struct SnapshotProduceAction<'a> {
-    tx: Transaction<'a>,
-    snapshot_id: i64,
-    key_metadata: Vec<u8>,
-    commit_uuid: Uuid,
-    snapshot_properties: HashMap<String, String>,
-    added_data_files: Vec<DataFile>,
-    // A counter used to generate unique manifest file names.
-    // It starts from 0 and increments for each new manifest file.
-    // Note: This counter is limited to the range of (0..u64::MAX).
-    manifest_counter: RangeFrom<u64>,
-}
-
-impl<'a> SnapshotProduceAction<'a> {
-    pub(crate) fn new(
-        tx: Transaction<'a>,
-        snapshot_id: i64,
-        key_metadata: Vec<u8>,
-        commit_uuid: Uuid,
-        snapshot_properties: HashMap<String, String>,
-    ) -> Result<Self> {
-        Ok(Self {
-            tx,
-            snapshot_id,
-            commit_uuid,
-            snapshot_properties,
-            added_data_files: vec![],
-            manifest_counter: (0..),
-            key_metadata,
-        })
-    }
-
-    // Check if the partition value is compatible with the partition type.
-    fn validate_partition_value(
-        partition_value: &Struct,
-        partition_type: &StructType,
-    ) -> Result<()> {
-        if partition_value.fields().len() != partition_type.fields().len() {
-            return Err(Error::new(
-                ErrorKind::DataInvalid,
-                "Partition value is not compatible with partition type",
-            ));
-        }
-
-        for (value, field) in 
partition_value.fields().iter().zip(partition_type.fields()) {
-            if !field
-                .field_type
-                .as_primitive_type()
-                .ok_or_else(|| {
-                    Error::new(
-                        ErrorKind::Unexpected,
-                        "Partition field should only be primitive type.",
-                    )
-                })?
-                .compatible(&value.as_primitive_literal().unwrap())
-            {
-                return Err(Error::new(
-                    ErrorKind::DataInvalid,
-                    "Partition value is not compatible partition type",
-                ));
-            }
-        }
-        Ok(())
-    }
-
-    /// Add data files to the snapshot.
-    pub fn add_data_files(
-        &mut self,
-        data_files: impl IntoIterator<Item = DataFile>,
-    ) -> Result<&mut Self> {
-        let data_files: Vec<DataFile> = data_files.into_iter().collect();
-        for data_file in &data_files {
-            if data_file.content_type() != crate::spec::DataContentType::Data {
-                return Err(Error::new(
-                    ErrorKind::DataInvalid,
-                    "Only data content type is allowed for fast append",
-                ));
-            }
-            Self::validate_partition_value(
-                data_file.partition(),
-                self.tx.table.metadata().default_partition_type(),
-            )?;
-        }
-        self.added_data_files.extend(data_files);
-        Ok(self)
-    }
-
-    fn new_manifest_output(&mut self) -> Result<OutputFile> {
-        let new_manifest_path = format!(
-            "{}/{}/{}-m{}.{}",
-            self.tx.table.metadata().location(),
-            META_ROOT_PATH,
-            self.commit_uuid,
-            self.manifest_counter.next().unwrap(),
-            DataFileFormat::Avro
-        );
-        self.tx.table.file_io().new_output(new_manifest_path)
-    }
-
-    // Write manifest file for added data files and return the ManifestFile 
for ManifestList.
-    async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
-        let added_data_files = std::mem::take(&mut self.added_data_files);
-        let snapshot_id = self.snapshot_id;
-        let manifest_entries = added_data_files.into_iter().map(|data_file| {
-            let builder = ManifestEntry::builder()
-                .status(crate::spec::ManifestStatus::Added)
-                .data_file(data_file);
-            if self.tx.table.metadata().format_version() == FormatVersion::V1 {
-                builder.snapshot_id(snapshot_id).build()
-            } else {
-                // For format version > 1, we set the snapshot id at the 
inherited time to avoid rewrite the manifest file when
-                // commit failed.
-                builder.build()
-            }
-        });
-        let mut writer = {
-            let builder = ManifestWriterBuilder::new(
-                self.new_manifest_output()?,
-                Some(self.snapshot_id),
-                self.key_metadata.clone(),
-                self.tx.table.metadata().current_schema().clone(),
-                self.tx
-                    .table
-                    .metadata()
-                    .default_partition_spec()
-                    .as_ref()
-                    .clone(),
-            );
-            if self.tx.table.metadata().format_version() == FormatVersion::V1 {
-                builder.build_v1()
-            } else {
-                builder.build_v2_data()
-            }
-        };
-        for entry in manifest_entries {
-            writer.add_entry(entry)?;
-        }
-        writer.write_manifest_file().await
-    }
-
-    async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
-        &mut self,
-        snapshot_produce_operation: &OP,
-        manifest_process: &MP,
-    ) -> Result<Vec<ManifestFile>> {
-        let added_manifest = self.write_added_manifest().await?;
-        let existing_manifests = 
snapshot_produce_operation.existing_manifest(self).await?;
-        // # TODO
-        // Support process delete entries.
-
-        let mut manifest_files = vec![added_manifest];
-        manifest_files.extend(existing_manifests);
-        let manifest_files = 
manifest_process.process_manifeset(manifest_files);
-        Ok(manifest_files)
-    }
-
-    // # TODO
-    // Fulfill this function
-    fn summary<OP: SnapshotProduceOperation>(&self, 
snapshot_produce_operation: &OP) -> Summary {
-        Summary {
-            operation: snapshot_produce_operation.operation(),
-            additional_properties: self.snapshot_properties.clone(),
-        }
-    }
-
-    fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
-        format!(
-            "{}/{}/snap-{}-{}-{}.{}",
-            self.tx.table.metadata().location(),
-            META_ROOT_PATH,
-            self.snapshot_id,
-            attempt,
-            self.commit_uuid,
-            DataFileFormat::Avro
-        )
-    }
-
-    /// Finished building the action and apply it to the transaction.
-    pub async fn apply<OP: SnapshotProduceOperation, MP: ManifestProcess>(
-        mut self,
-        snapshot_produce_operation: OP,
-        process: MP,
-    ) -> Result<Transaction<'a>> {
-        let new_manifests = self
-            .manifest_file(&snapshot_produce_operation, &process)
-            .await?;
-        let next_seq_num = self.tx.table.metadata().next_sequence_number();
-
-        let summary = self.summary(&snapshot_produce_operation);
-
-        let manifest_list_path = self.generate_manifest_list_file_path(0);
-
-        let mut manifest_list_writer = match 
self.tx.table.metadata().format_version() {
-            FormatVersion::V1 => ManifestListWriter::v1(
-                self.tx
-                    .table
-                    .file_io()
-                    .new_output(manifest_list_path.clone())?,
-                self.snapshot_id,
-                self.tx.table.metadata().current_snapshot_id(),
-            ),
-            FormatVersion::V2 => ManifestListWriter::v2(
-                self.tx
-                    .table
-                    .file_io()
-                    .new_output(manifest_list_path.clone())?,
-                self.snapshot_id,
-                self.tx.table.metadata().current_snapshot_id(),
-                next_seq_num,
-            ),
-        };
-        manifest_list_writer.add_manifests(new_manifests.into_iter())?;
-        manifest_list_writer.close().await?;
-
-        let commit_ts = chrono::Utc::now().timestamp_millis();
-        let new_snapshot = Snapshot::builder()
-            .with_manifest_list(manifest_list_path)
-            .with_snapshot_id(self.snapshot_id)
-            
.with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id())
-            .with_sequence_number(next_seq_num)
-            .with_summary(summary)
-            .with_schema_id(self.tx.table.metadata().current_schema_id())
-            .with_timestamp_ms(commit_ts)
-            .build();
-
-        self.tx.append_updates(vec![
-            TableUpdate::AddSnapshot {
-                snapshot: new_snapshot,
-            },
-            TableUpdate::SetSnapshotRef {
-                ref_name: MAIN_BRANCH.to_string(),
-                reference: SnapshotReference::new(
-                    self.snapshot_id,
-                    SnapshotRetention::branch(None, None, None),
-                ),
-            },
-        ])?;
-        self.tx.append_requirements(vec![
-            TableRequirement::UuidMatch {
-                uuid: self.tx.table.metadata().uuid(),
-            },
-            TableRequirement::RefSnapshotIdMatch {
-                r#ref: MAIN_BRANCH.to_string(),
-                snapshot_id: self.tx.table.metadata().current_snapshot_id(),
-            },
-        ])?;
-        Ok(self.tx)
-    }
-}
-
-/// Transaction action for replacing sort order.
-pub struct ReplaceSortOrderAction<'a> {
-    tx: Transaction<'a>,
-    sort_fields: Vec<SortField>,
-}
-
-impl<'a> ReplaceSortOrderAction<'a> {
-    /// Adds a field for sorting in ascending order.
-    pub fn asc(self, name: &str, null_order: NullOrder) -> Result<Self> {
-        self.add_sort_field(name, SortDirection::Ascending, null_order)
-    }
-
-    /// Adds a field for sorting in descending order.
-    pub fn desc(self, name: &str, null_order: NullOrder) -> Result<Self> {
-        self.add_sort_field(name, SortDirection::Descending, null_order)
-    }
-
-    /// Finished building the action and apply it to the transaction.
-    pub fn apply(mut self) -> Result<Transaction<'a>> {
-        let unbound_sort_order = SortOrder::builder()
-            .with_fields(self.sort_fields)
-            .build_unbound()?;
-
-        let updates = vec![
-            TableUpdate::AddSortOrder {
-                sort_order: unbound_sort_order,
-            },
-            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
-        ];
-
-        let requirements = vec![
-            TableRequirement::CurrentSchemaIdMatch {
-                current_schema_id: 
self.tx.table.metadata().current_schema().schema_id(),
-            },
-            TableRequirement::DefaultSortOrderIdMatch {
-                default_sort_order_id: 
self.tx.table.metadata().default_sort_order().order_id,
-            },
-        ];
-
-        self.tx.append_requirements(requirements)?;
-        self.tx.append_updates(updates)?;
-        Ok(self.tx)
-    }
-
-    fn add_sort_field(
-        mut self,
-        name: &str,
-        sort_direction: SortDirection,
-        null_order: NullOrder,
-    ) -> Result<Self> {
-        let field_id = self
-            .tx
-            .table
-            .metadata()
-            .current_schema()
-            .field_id_by_name(name)
-            .ok_or_else(|| {
-                Error::new(
-                    ErrorKind::DataInvalid,
-                    format!("Cannot find field {} in table schema", name),
-                )
-            })?;
-
-        let sort_field = SortField::builder()
-            .source_id(field_id)
-            .transform(Transform::Identity)
-            .direction(sort_direction)
-            .null_order(null_order)
-            .build();
-
-        self.sort_fields.push(sort_field);
-        Ok(self)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::collections::HashMap;
-    use std::fs::File;
-    use std::io::BufReader;
-
-    use crate::io::FileIOBuilder;
-    use crate::scan::tests::TableTestFixture;
-    use crate::spec::{
-        DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, 
Literal, Struct,
-        TableMetadata,
-    };
-    use crate::table::Table;
-    use crate::transaction::{Transaction, MAIN_BRANCH};
-    use crate::{TableIdent, TableRequirement, TableUpdate};
-
-    fn make_v1_table() -> Table {
-        let file = File::open(format!(
-            "{}/testdata/table_metadata/{}",
-            env!("CARGO_MANIFEST_DIR"),
-            "TableMetadataV1Valid.json"
-        ))
-        .unwrap();
-        let reader = BufReader::new(file);
-        let resp = serde_json::from_reader::<_, 
TableMetadata>(reader).unwrap();
-
-        Table::builder()
-            .metadata(resp)
-            
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
-            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
-            .file_io(FileIOBuilder::new("memory").build().unwrap())
-            .build()
-            .unwrap()
-    }
-
-    fn make_v2_table() -> Table {
-        let file = File::open(format!(
-            "{}/testdata/table_metadata/{}",
-            env!("CARGO_MANIFEST_DIR"),
-            "TableMetadataV2Valid.json"
-        ))
-        .unwrap();
-        let reader = BufReader::new(file);
-        let resp = serde_json::from_reader::<_, 
TableMetadata>(reader).unwrap();
-
-        Table::builder()
-            .metadata(resp)
-            
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
-            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
-            .file_io(FileIOBuilder::new("memory").build().unwrap())
-            .build()
-            .unwrap()
-    }
-
-    fn make_v2_minimal_table() -> Table {
-        let file = File::open(format!(
-            "{}/testdata/table_metadata/{}",
-            env!("CARGO_MANIFEST_DIR"),
-            "TableMetadataV2ValidMinimal.json"
-        ))
-        .unwrap();
-        let reader = BufReader::new(file);
-        let resp = serde_json::from_reader::<_, 
TableMetadata>(reader).unwrap();
-
-        Table::builder()
-            .metadata(resp)
-            
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
-            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
-            .file_io(FileIOBuilder::new("memory").build().unwrap())
-            .build()
-            .unwrap()
-    }
-
-    #[test]
-    fn test_upgrade_table_version_v1_to_v2() {
-        let table = make_v1_table();
-        let tx = Transaction::new(&table);
-        let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
-
-        assert_eq!(
-            vec![TableUpdate::UpgradeFormatVersion {
-                format_version: FormatVersion::V2
-            }],
-            tx.updates
-        );
-    }
-
-    #[test]
-    fn test_upgrade_table_version_v2_to_v2() {
-        let table = make_v2_table();
-        let tx = Transaction::new(&table);
-        let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
-
-        assert!(
-            tx.updates.is_empty(),
-            "Upgrade table to same version should not generate any updates"
-        );
-        assert!(
-            tx.requirements.is_empty(),
-            "Upgrade table to same version should not generate any 
requirements"
-        );
-    }
-
-    #[test]
-    fn test_downgrade_table_version() {
-        let table = make_v2_table();
-        let tx = Transaction::new(&table);
-        let tx = tx.upgrade_table_version(FormatVersion::V1);
-
-        assert!(tx.is_err(), "Downgrade table version should fail!");
-    }
-
-    #[test]
-    fn test_set_table_property() {
-        let table = make_v2_table();
-        let tx = Transaction::new(&table);
-        let tx = tx
-            .set_properties(HashMap::from([("a".to_string(), 
"b".to_string())]))
-            .unwrap();
-
-        assert_eq!(
-            vec![TableUpdate::SetProperties {
-                updates: HashMap::from([("a".to_string(), "b".to_string())])
-            }],
-            tx.updates
-        );
-    }
-
-    #[test]
-    fn test_remove_property() {
-        let table = make_v2_table();
-        let tx = Transaction::new(&table);
-        let tx = tx
-            .remove_properties(vec!["a".to_string(), "b".to_string()])
-            .unwrap();
-
-        assert_eq!(
-            vec![TableUpdate::RemoveProperties {
-                removals: vec!["a".to_string(), "b".to_string()]
-            }],
-            tx.updates
-        );
-    }
-
-    #[test]
-    fn test_replace_sort_order() {
-        let table = make_v2_table();
-        let tx = Transaction::new(&table);
-        let tx = tx.replace_sort_order().apply().unwrap();
-
-        assert_eq!(
-            vec![
-                TableUpdate::AddSortOrder {
-                    sort_order: Default::default()
-                },
-                TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }
-            ],
-            tx.updates
-        );
-
-        assert_eq!(
-            vec![
-                TableRequirement::CurrentSchemaIdMatch {
-                    current_schema_id: 1
-                },
-                TableRequirement::DefaultSortOrderIdMatch {
-                    default_sort_order_id: 3
-                }
-            ],
-            tx.requirements
-        );
-    }
-
-    #[tokio::test]
-    async fn test_fast_append_action() {
-        let table = make_v2_minimal_table();
-        let tx = Transaction::new(&table);
-        let mut action = tx.fast_append(None, vec![]).unwrap();
-
-        // check add data file with incompatible partition value
-        let data_file = DataFileBuilder::default()
-            .content(DataContentType::Data)
-            .file_path("test/3.parquet".to_string())
-            .file_format(DataFileFormat::Parquet)
-            .file_size_in_bytes(100)
-            .record_count(1)
-            .partition(Struct::from_iter([Some(Literal::string("test"))]))
-            .build()
-            .unwrap();
-        assert!(action.add_data_files(vec![data_file.clone()]).is_err());
-
-        let data_file = DataFileBuilder::default()
-            .content(DataContentType::Data)
-            .file_path("test/3.parquet".to_string())
-            .file_format(DataFileFormat::Parquet)
-            .file_size_in_bytes(100)
-            .record_count(1)
-            .partition(Struct::from_iter([Some(Literal::long(300))]))
-            .build()
-            .unwrap();
-        action.add_data_files(vec![data_file.clone()]).unwrap();
-        let tx = action.apply().await.unwrap();
-
-        // check updates and requirements
-        assert!(
-            matches!((&tx.updates[0],&tx.updates[1]), 
(TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { 
reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && 
ref_name == MAIN_BRANCH)
-        );
-        assert_eq!(
-            vec![
-                TableRequirement::UuidMatch {
-                    uuid: tx.table.metadata().uuid()
-                },
-                TableRequirement::RefSnapshotIdMatch {
-                    r#ref: MAIN_BRANCH.to_string(),
-                    snapshot_id: tx.table.metadata().current_snapshot_id
-                }
-            ],
-            tx.requirements
-        );
-
-        // check manifest list
-        let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = 
&tx.updates[0] {
-            snapshot
-        } else {
-            unreachable!()
-        };
-        let manifest_list = new_snapshot
-            .load_manifest_list(table.file_io(), table.metadata())
-            .await
-            .unwrap();
-        assert_eq!(1, manifest_list.entries().len());
-        assert_eq!(
-            manifest_list.entries()[0].sequence_number,
-            new_snapshot.sequence_number()
-        );
-
-        // check manifset
-        let manifest = manifest_list.entries()[0]
-            .load_manifest(table.file_io())
-            .await
-            .unwrap();
-        assert_eq!(1, manifest.entries().len());
-        assert_eq!(
-            new_snapshot.sequence_number(),
-            manifest.entries()[0]
-                .sequence_number()
-                .expect("Inherit sequence number by load manifest")
-        );
-
-        assert_eq!(
-            new_snapshot.snapshot_id(),
-            manifest.entries()[0].snapshot_id().unwrap()
-        );
-        assert_eq!(data_file, *manifest.entries()[0].data_file());
-    }
-
-    #[test]
-    fn test_do_same_update_in_same_transaction() {
-        let table = make_v2_table();
-        let tx = Transaction::new(&table);
-        let tx = tx
-            .remove_properties(vec!["a".to_string(), "b".to_string()])
-            .unwrap();
-
-        let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);
-
-        assert!(
-            tx.is_err(),
-            "Should not allow to do same kinds update in same transaction"
-        );
-    }
-
-    #[tokio::test]
-    async fn test_add_existing_parquet_files_to_unpartitioned_table() {
-        let mut fixture = TableTestFixture::new_unpartitioned();
-        fixture.setup_unpartitioned_manifest_files().await;
-        let tx = crate::transaction::Transaction::new(&fixture.table);
-
-        let file_paths = vec![
-            format!("{}/1.parquet", &fixture.table_location),
-            format!("{}/2.parquet", &fixture.table_location),
-            format!("{}/3.parquet", &fixture.table_location),
-        ];
-
-        let fast_append_action = tx.fast_append(None, vec![]).unwrap();
-
-        // Attempt to add the existing Parquet files with fast append.
-        let new_tx = fast_append_action
-            .add_parquet_files(file_paths.clone())
-            .await
-            .expect("Adding existing Parquet files should succeed");
-
-        let mut found_add_snapshot = false;
-        let mut found_set_snapshot_ref = false;
-        for update in new_tx.updates.iter() {
-            match update {
-                TableUpdate::AddSnapshot { .. } => {
-                    found_add_snapshot = true;
-                }
-                TableUpdate::SetSnapshotRef {
-                    ref_name,
-                    reference,
-                } => {
-                    found_set_snapshot_ref = true;
-                    assert_eq!(ref_name, crate::transaction::MAIN_BRANCH);
-                    assert!(reference.snapshot_id > 0);
-                }
-                _ => {}
-            }
-        }
-        assert!(found_add_snapshot);
-        assert!(found_set_snapshot_ref);
-
-        let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = 
&new_tx.updates[0] {
-            snapshot
-        } else {
-            panic!("Expected the first update to be an AddSnapshot update");
-        };
-
-        let manifest_list = new_snapshot
-            .load_manifest_list(fixture.table.file_io(), 
fixture.table.metadata())
-            .await
-            .expect("Failed to load manifest list");
-
-        assert_eq!(
-            manifest_list.entries().len(),
-            2,
-            "Expected 2 manifest list entries, got {}",
-            manifest_list.entries().len()
-        );
-
-        // Load the manifest from the manifest list
-        let manifest = manifest_list.entries()[0]
-            .load_manifest(fixture.table.file_io())
-            .await
-            .expect("Failed to load manifest");
-
-        // Check that the manifest contains three entries.
-        assert_eq!(manifest.entries().len(), 3);
-
-        // Verify each file path appears in manifest.
-        let manifest_paths: Vec<String> = manifest
-            .entries()
-            .iter()
-            .map(|entry| entry.data_file().file_path.clone())
-            .collect();
-        for path in file_paths {
-            assert!(manifest_paths.contains(&path));
-        }
-    }
-}
diff --git a/crates/iceberg/src/transaction/append.rs 
b/crates/iceberg/src/transaction/append.rs
new file mode 100644
index 00000000..361924de
--- /dev/null
+++ b/crates/iceberg/src/transaction/append.rs
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+
+use arrow_array::StringArray;
+use futures::TryStreamExt;
+use uuid::Uuid;
+
+use crate::error::Result;
+use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
+use crate::transaction::snapshot::{
+    DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
+};
+use crate::transaction::Transaction;
+use crate::writer::file_writer::ParquetWriter;
+use crate::{Error, ErrorKind};
+
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+    check_duplicate: bool,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        commit_uuid: Uuid,
+        key_metadata: Vec<u8>,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            snapshot_produce_action: SnapshotProduceAction::new(
+                tx,
+                snapshot_id,
+                key_metadata,
+                commit_uuid,
+                snapshot_properties,
+            )?,
+            check_duplicate: true,
+        })
+    }
+
+    /// Set whether to check duplicate files
+    pub fn with_check_duplicate(mut self, v: bool) -> Self {
+        self.check_duplicate = v;
+        self
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.snapshot_produce_action.add_data_files(data_files)?;
+        Ok(self)
+    }
+
+    /// Adds existing parquet files
+    #[allow(dead_code)]
+    async fn add_parquet_files(mut self, file_path: Vec<String>) -> 
Result<Transaction<'a>> {
+        if !self
+            .snapshot_produce_action
+            .tx
+            .table
+            .metadata()
+            .default_spec
+            .is_unpartitioned()
+        {
+            return Err(Error::new(
+                ErrorKind::FeatureUnsupported,
+                "Appending to partitioned tables is not supported",
+            ));
+        }
+
+        let table_metadata = self.snapshot_produce_action.tx.table.metadata();
+
+        let data_files = ParquetWriter::parquet_files_to_data_files(
+            self.snapshot_produce_action.tx.table.file_io(),
+            file_path,
+            table_metadata,
+        )
+        .await?;
+
+        self.add_data_files(data_files)?;
+
+        self.apply().await
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(self) -> Result<Transaction<'a>> {
+        // Checks duplicate files
+        if self.check_duplicate {
+            let new_files: HashSet<&str> = self
+                .snapshot_produce_action
+                .added_data_files
+                .iter()
+                .map(|df| df.file_path.as_str())
+                .collect();
+
+            let mut manifest_stream = self
+                .snapshot_produce_action
+                .tx
+                .table
+                .inspect()
+                .manifests()
+                .scan()
+                .await?;
+            let mut referenced_files = Vec::new();
+
+            while let Some(batch) = manifest_stream.try_next().await? {
+                let file_path_array = batch
+                    .column(1)
+                    .as_any()
+                    .downcast_ref::<StringArray>()
+                    .ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::DataInvalid,
+                            "Failed to downcast file_path column to 
StringArray",
+                        )
+                    })?;
+
+                for i in 0..batch.num_rows() {
+                    let file_path = file_path_array.value(i);
+                    if new_files.contains(file_path) {
+                        referenced_files.push(file_path.to_string());
+                    }
+                }
+            }
+
+            if !referenced_files.is_empty() {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Cannot add files that are already referenced by 
table, files: {}",
+                        referenced_files.join(", ")
+                    ),
+                ));
+            }
+        }
+
+        self.snapshot_produce_action
+            .apply(FastAppendOperation, DefaultManifestProcess)
+            .await
+    }
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+    fn operation(&self) -> Operation {
+        Operation::Append
+    }
+
+    async fn delete_entries(
+        &self,
+        _snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestEntry>> {
+        Ok(vec![])
+    }
+
+    async fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestFile>> {
+        let Some(snapshot) = 
snapshot_produce.tx.table.metadata().current_snapshot() else {
+            return Ok(vec![]);
+        };
+
+        let manifest_list = snapshot
+            .load_manifest_list(
+                snapshot_produce.tx.table.file_io(),
+                &snapshot_produce.tx.table.metadata_ref(),
+            )
+            .await?;
+
+        Ok(manifest_list
+            .entries()
+            .iter()
+            .filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+            .cloned()
+            .collect())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::scan::tests::TableTestFixture;
+    use crate::spec::{
+        DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, 
MAIN_BRANCH,
+    };
+    use crate::transaction::tests::make_v2_minimal_table;
+    use crate::transaction::Transaction;
+    use crate::{TableRequirement, TableUpdate};
+
+    #[tokio::test]
+    async fn test_fast_append_action() {
+        let table = make_v2_minimal_table();
+        let tx = Transaction::new(&table);
+        let mut action = tx.fast_append(None, vec![]).unwrap();
+
+        // check add data file with incompatible partition value
+        let data_file = DataFileBuilder::default()
+            .content(DataContentType::Data)
+            .file_path("test/3.parquet".to_string())
+            .file_format(DataFileFormat::Parquet)
+            .file_size_in_bytes(100)
+            .record_count(1)
+            .partition(Struct::from_iter([Some(Literal::string("test"))]))
+            .build()
+            .unwrap();
+        assert!(action.add_data_files(vec![data_file.clone()]).is_err());
+
+        let data_file = DataFileBuilder::default()
+            .content(DataContentType::Data)
+            .file_path("test/3.parquet".to_string())
+            .file_format(DataFileFormat::Parquet)
+            .file_size_in_bytes(100)
+            .record_count(1)
+            .partition(Struct::from_iter([Some(Literal::long(300))]))
+            .build()
+            .unwrap();
+        action.add_data_files(vec![data_file.clone()]).unwrap();
+        let tx = action.apply().await.unwrap();
+
+        // check updates and requirements
+        assert!(
+            matches!((&tx.updates[0],&tx.updates[1]), 
(TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { 
reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && 
ref_name == MAIN_BRANCH)
+        );
+        assert_eq!(
+            vec![
+                TableRequirement::UuidMatch {
+                    uuid: tx.table.metadata().uuid()
+                },
+                TableRequirement::RefSnapshotIdMatch {
+                    r#ref: MAIN_BRANCH.to_string(),
+                    snapshot_id: tx.table.metadata().current_snapshot_id
+                }
+            ],
+            tx.requirements
+        );
+
+        // check manifest list
+        let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = 
&tx.updates[0] {
+            snapshot
+        } else {
+            unreachable!()
+        };
+        let manifest_list = new_snapshot
+            .load_manifest_list(table.file_io(), table.metadata())
+            .await
+            .unwrap();
+        assert_eq!(1, manifest_list.entries().len());
+        assert_eq!(
+            manifest_list.entries()[0].sequence_number,
+            new_snapshot.sequence_number()
+        );
+
+        // check manifset
+        let manifest = manifest_list.entries()[0]
+            .load_manifest(table.file_io())
+            .await
+            .unwrap();
+        assert_eq!(1, manifest.entries().len());
+        assert_eq!(
+            new_snapshot.sequence_number(),
+            manifest.entries()[0]
+                .sequence_number()
+                .expect("Inherit sequence number by load manifest")
+        );
+
+        assert_eq!(
+            new_snapshot.snapshot_id(),
+            manifest.entries()[0].snapshot_id().unwrap()
+        );
+        assert_eq!(data_file, *manifest.entries()[0].data_file());
+    }
+
+    #[tokio::test]
+    async fn test_add_existing_parquet_files_to_unpartitioned_table() {
+        let mut fixture = TableTestFixture::new_unpartitioned();
+        fixture.setup_unpartitioned_manifest_files().await;
+        let tx = crate::transaction::Transaction::new(&fixture.table);
+
+        let file_paths = vec![
+            format!("{}/1.parquet", &fixture.table_location),
+            format!("{}/2.parquet", &fixture.table_location),
+            format!("{}/3.parquet", &fixture.table_location),
+        ];
+
+        let fast_append_action = tx.fast_append(None, vec![]).unwrap();
+
+        // Attempt to add the existing Parquet files with fast append.
+        let new_tx = fast_append_action
+            .add_parquet_files(file_paths.clone())
+            .await
+            .expect("Adding existing Parquet files should succeed");
+
+        let mut found_add_snapshot = false;
+        let mut found_set_snapshot_ref = false;
+        for update in new_tx.updates.iter() {
+            match update {
+                TableUpdate::AddSnapshot { .. } => {
+                    found_add_snapshot = true;
+                }
+                TableUpdate::SetSnapshotRef {
+                    ref_name,
+                    reference,
+                } => {
+                    found_set_snapshot_ref = true;
+                    assert_eq!(ref_name, MAIN_BRANCH);
+                    assert!(reference.snapshot_id > 0);
+                }
+                _ => {}
+            }
+        }
+        assert!(found_add_snapshot);
+        assert!(found_set_snapshot_ref);
+
+        let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = 
&new_tx.updates[0] {
+            snapshot
+        } else {
+            panic!("Expected the first update to be an AddSnapshot update");
+        };
+
+        let manifest_list = new_snapshot
+            .load_manifest_list(fixture.table.file_io(), 
fixture.table.metadata())
+            .await
+            .expect("Failed to load manifest list");
+
+        assert_eq!(
+            manifest_list.entries().len(),
+            2,
+            "Expected 2 manifest list entries, got {}",
+            manifest_list.entries().len()
+        );
+
+        // Load the manifest from the manifest list
+        let manifest = manifest_list.entries()[0]
+            .load_manifest(fixture.table.file_io())
+            .await
+            .expect("Failed to load manifest");
+
+        // Check that the manifest contains three entries.
+        assert_eq!(manifest.entries().len(), 3);
+
+        // Verify each file path appears in manifest.
+        let manifest_paths: Vec<String> = manifest
+            .entries()
+            .iter()
+            .map(|entry| entry.data_file().file_path.clone())
+            .collect();
+        for path in file_paths {
+            assert!(manifest_paths.contains(&path));
+        }
+    }
+}
diff --git a/crates/iceberg/src/transaction/mod.rs 
b/crates/iceberg/src/transaction/mod.rs
new file mode 100644
index 00000000..d3c7bc3f
--- /dev/null
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -0,0 +1,326 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains transaction api.
+
+mod append;
+mod snapshot;
+mod sort_order;
+
+use std::cmp::Ordering;
+use std::collections::HashMap;
+use std::mem::discriminant;
+
+use uuid::Uuid;
+
+use crate::error::Result;
+use crate::spec::FormatVersion;
+use crate::table::Table;
+use crate::transaction::append::FastAppendAction;
+use crate::transaction::sort_order::ReplaceSortOrderAction;
+use crate::TableUpdate::UpgradeFormatVersion;
+use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, 
TableUpdate};
+
+/// Table transaction.
+pub struct Transaction<'a> {
+    table: &'a Table,
+    updates: Vec<TableUpdate>,
+    requirements: Vec<TableRequirement>,
+}
+
+impl<'a> Transaction<'a> {
+    /// Creates a new transaction.
+    pub fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            updates: vec![],
+            requirements: vec![],
+        }
+    }
+
+    fn append_updates(&mut self, updates: Vec<TableUpdate>) -> Result<()> {
+        for update in &updates {
+            for up in &self.updates {
+                if discriminant(up) == discriminant(update) {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Cannot apply update with same type at same time: 
{:?}",
+                            update
+                        ),
+                    ));
+                }
+            }
+        }
+        self.updates.extend(updates);
+        Ok(())
+    }
+
+    fn append_requirements(&mut self, requirements: Vec<TableRequirement>) -> 
Result<()> {
+        self.requirements.extend(requirements);
+        Ok(())
+    }
+
+    /// Sets table to a new version.
+    pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> 
Result<Self> {
+        let current_version = self.table.metadata().format_version();
+        match current_version.cmp(&format_version) {
+            Ordering::Greater => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Cannot downgrade table version from {} to {}",
+                        current_version, format_version
+                    ),
+                ));
+            }
+            Ordering::Less => {
+                self.append_updates(vec![UpgradeFormatVersion { format_version 
}])?;
+            }
+            Ordering::Equal => {
+                // Do nothing.
+            }
+        }
+        Ok(self)
+    }
+
+    /// Update table's property.
+    pub fn set_properties(mut self, props: HashMap<String, String>) -> 
Result<Self> {
+        self.append_updates(vec![TableUpdate::SetProperties { updates: props 
}])?;
+        Ok(self)
+    }
+
+    fn generate_unique_snapshot_id(&self) -> i64 {
+        let generate_random_id = || -> i64 {
+            let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
+            let snapshot_id = (lhs ^ rhs) as i64;
+            if snapshot_id < 0 {
+                -snapshot_id
+            } else {
+                snapshot_id
+            }
+        };
+        let mut snapshot_id = generate_random_id();
+        while self
+            .table
+            .metadata()
+            .snapshots()
+            .any(|s| s.snapshot_id() == snapshot_id)
+        {
+            snapshot_id = generate_random_id();
+        }
+        snapshot_id
+    }
+
+    /// Creates a fast append action.
+    pub fn fast_append(
+        self,
+        commit_uuid: Option<Uuid>,
+        key_metadata: Vec<u8>,
+    ) -> Result<FastAppendAction<'a>> {
+        let snapshot_id = self.generate_unique_snapshot_id();
+        FastAppendAction::new(
+            self,
+            snapshot_id,
+            commit_uuid.unwrap_or_else(Uuid::now_v7),
+            key_metadata,
+            HashMap::new(),
+        )
+    }
+
+    /// Creates replace sort order action.
+    pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
+        ReplaceSortOrderAction {
+            tx: self,
+            sort_fields: vec![],
+        }
+    }
+
+    /// Remove properties in table.
+    pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
+        self.append_updates(vec![TableUpdate::RemoveProperties { removals: 
keys }])?;
+        Ok(self)
+    }
+
+    /// Commit transaction.
+    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
+        let table_commit = TableCommit::builder()
+            .ident(self.table.identifier().clone())
+            .updates(self.updates)
+            .requirements(self.requirements)
+            .build();
+
+        catalog.update_table(table_commit).await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+    use std::fs::File;
+    use std::io::BufReader;
+
+    use crate::io::FileIOBuilder;
+    use crate::spec::{FormatVersion, TableMetadata};
+    use crate::table::Table;
+    use crate::transaction::Transaction;
+    use crate::{TableIdent, TableUpdate};
+
+    fn make_v1_table() -> Table {
+        let file = File::open(format!(
+            "{}/testdata/table_metadata/{}",
+            env!("CARGO_MANIFEST_DIR"),
+            "TableMetadataV1Valid.json"
+        ))
+        .unwrap();
+        let reader = BufReader::new(file);
+        let resp = serde_json::from_reader::<_, 
TableMetadata>(reader).unwrap();
+
+        Table::builder()
+            .metadata(resp)
+            
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
+            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+            .file_io(FileIOBuilder::new("memory").build().unwrap())
+            .build()
+            .unwrap()
+    }
+
+    pub fn make_v2_table() -> Table {
+        let file = File::open(format!(
+            "{}/testdata/table_metadata/{}",
+            env!("CARGO_MANIFEST_DIR"),
+            "TableMetadataV2Valid.json"
+        ))
+        .unwrap();
+        let reader = BufReader::new(file);
+        let resp = serde_json::from_reader::<_, 
TableMetadata>(reader).unwrap();
+
+        Table::builder()
+            .metadata(resp)
+            
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
+            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+            .file_io(FileIOBuilder::new("memory").build().unwrap())
+            .build()
+            .unwrap()
+    }
+
+    pub fn make_v2_minimal_table() -> Table {
+        let file = File::open(format!(
+            "{}/testdata/table_metadata/{}",
+            env!("CARGO_MANIFEST_DIR"),
+            "TableMetadataV2ValidMinimal.json"
+        ))
+        .unwrap();
+        let reader = BufReader::new(file);
+        let resp = serde_json::from_reader::<_, 
TableMetadata>(reader).unwrap();
+
+        Table::builder()
+            .metadata(resp)
+            
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
+            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+            .file_io(FileIOBuilder::new("memory").build().unwrap())
+            .build()
+            .unwrap()
+    }
+
+    #[test]
+    fn test_upgrade_table_version_v1_to_v2() {
+        let table = make_v1_table();
+        let tx = Transaction::new(&table);
+        let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
+
+        assert_eq!(
+            vec![TableUpdate::UpgradeFormatVersion {
+                format_version: FormatVersion::V2
+            }],
+            tx.updates
+        );
+    }
+
+    #[test]
+    fn test_upgrade_table_version_v2_to_v2() {
+        let table = make_v2_table();
+        let tx = Transaction::new(&table);
+        let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
+
+        assert!(
+            tx.updates.is_empty(),
+            "Upgrade table to same version should not generate any updates"
+        );
+        assert!(
+            tx.requirements.is_empty(),
+            "Upgrade table to same version should not generate any 
requirements"
+        );
+    }
+
+    #[test]
+    fn test_downgrade_table_version() {
+        let table = make_v2_table();
+        let tx = Transaction::new(&table);
+        let tx = tx.upgrade_table_version(FormatVersion::V1);
+
+        assert!(tx.is_err(), "Downgrade table version should fail!");
+    }
+
+    #[test]
+    fn test_set_table_property() {
+        let table = make_v2_table();
+        let tx = Transaction::new(&table);
+        let tx = tx
+            .set_properties(HashMap::from([("a".to_string(), 
"b".to_string())]))
+            .unwrap();
+
+        assert_eq!(
+            vec![TableUpdate::SetProperties {
+                updates: HashMap::from([("a".to_string(), "b".to_string())])
+            }],
+            tx.updates
+        );
+    }
+
+    #[test]
+    fn test_remove_property() {
+        let table = make_v2_table();
+        let tx = Transaction::new(&table);
+        let tx = tx
+            .remove_properties(vec!["a".to_string(), "b".to_string()])
+            .unwrap();
+
+        assert_eq!(
+            vec![TableUpdate::RemoveProperties {
+                removals: vec!["a".to_string(), "b".to_string()]
+            }],
+            tx.updates
+        );
+    }
+
+    #[test]
+    fn test_do_same_update_in_same_transaction() {
+        let table = make_v2_table();
+        let tx = Transaction::new(&table);
+        let tx = tx
+            .remove_properties(vec!["a".to_string(), "b".to_string()])
+            .unwrap();
+
+        let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);
+
+        assert!(
+            tx.is_err(),
+            "Should not allow to do same kinds update in same transaction"
+        );
+    }
+}
diff --git a/crates/iceberg/src/transaction/snapshot.rs 
b/crates/iceberg/src/transaction/snapshot.rs
new file mode 100644
index 00000000..4a3035ba
--- /dev/null
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::future::Future;
+use std::ops::RangeFrom;
+
+use uuid::Uuid;
+
+use crate::error::Result;
+use crate::io::OutputFile;
+use crate::spec::{
+    DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, 
ManifestListWriter,
+    ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, 
SnapshotRetention, Struct,
+    StructType, Summary, MAIN_BRANCH,
+};
+use crate::transaction::Transaction;
+use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
+
+const META_ROOT_PATH: &str = "metadata";
+
+pub(crate) trait SnapshotProduceOperation: Send + Sync {
+    fn operation(&self) -> Operation;
+    #[allow(unused)]
+    fn delete_entries(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
+    fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
+}
+
+pub(crate) struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile> {
+        manifests
+    }
+}
+
+pub(crate) trait ManifestProcess: Send + Sync {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> 
Vec<ManifestFile>;
+}
+
+pub(crate) struct SnapshotProduceAction<'a> {
+    pub tx: Transaction<'a>,
+    snapshot_id: i64,
+    key_metadata: Vec<u8>,
+    commit_uuid: Uuid,
+    snapshot_properties: HashMap<String, String>,
+    pub added_data_files: Vec<DataFile>,
+    // A counter used to generate unique manifest file names.
+    // It starts from 0 and increments for each new manifest file.
+    // Note: This counter is limited to the range of (0..u64::MAX).
+    manifest_counter: RangeFrom<u64>,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        key_metadata: Vec<u8>,
+        commit_uuid: Uuid,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            snapshot_id,
+            commit_uuid,
+            snapshot_properties,
+            added_data_files: vec![],
+            manifest_counter: (0..),
+            key_metadata,
+        })
+    }
+
+    // Check if the partition value is compatible with the partition type.
+    fn validate_partition_value(
+        partition_value: &Struct,
+        partition_type: &StructType,
+    ) -> Result<()> {
+        if partition_value.fields().len() != partition_type.fields().len() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Partition value is not compatible with partition type",
+            ));
+        }
+
+        for (value, field) in 
partition_value.fields().iter().zip(partition_type.fields()) {
+            if !field
+                .field_type
+                .as_primitive_type()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        "Partition field should only be primitive type.",
+                    )
+                })?
+                .compatible(&value.as_primitive_literal().unwrap())
+            {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Partition value is not compatible partition type",
+                ));
+            }
+        }
+        Ok(())
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        let data_files: Vec<DataFile> = data_files.into_iter().collect();
+        for data_file in &data_files {
+            if data_file.content_type() != crate::spec::DataContentType::Data {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Only data content type is allowed for fast append",
+                ));
+            }
+            Self::validate_partition_value(
+                data_file.partition(),
+                self.tx.table.metadata().default_partition_type(),
+            )?;
+        }
+        self.added_data_files.extend(data_files);
+        Ok(self)
+    }
+
+    fn new_manifest_output(&mut self) -> Result<OutputFile> {
+        let new_manifest_path = format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.commit_uuid,
+            self.manifest_counter.next().unwrap(),
+            DataFileFormat::Avro
+        );
+        self.tx.table.file_io().new_output(new_manifest_path)
+    }
+
+    // Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+    async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
+        let added_data_files = std::mem::take(&mut self.added_data_files);
+        let snapshot_id = self.snapshot_id;
+        let manifest_entries = added_data_files.into_iter().map(|data_file| {
+            let builder = ManifestEntry::builder()
+                .status(crate::spec::ManifestStatus::Added)
+                .data_file(data_file);
+            if self.tx.table.metadata().format_version() == FormatVersion::V1 {
+                builder.snapshot_id(snapshot_id).build()
+            } else {
+                // For format version > 1, we set the snapshot id at the 
inherited time to avoid rewrite the manifest file when
+                // commit failed.
+                builder.build()
+            }
+        });
+        let mut writer = {
+            let builder = ManifestWriterBuilder::new(
+                self.new_manifest_output()?,
+                Some(self.snapshot_id),
+                self.key_metadata.clone(),
+                self.tx.table.metadata().current_schema().clone(),
+                self.tx
+                    .table
+                    .metadata()
+                    .default_partition_spec()
+                    .as_ref()
+                    .clone(),
+            );
+            if self.tx.table.metadata().format_version() == FormatVersion::V1 {
+                builder.build_v1()
+            } else {
+                builder.build_v2_data()
+            }
+        };
+        for entry in manifest_entries {
+            writer.add_entry(entry)?;
+        }
+        writer.write_manifest_file().await
+    }
+
+    async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
+        &mut self,
+        snapshot_produce_operation: &OP,
+        manifest_process: &MP,
+    ) -> Result<Vec<ManifestFile>> {
+        let added_manifest = self.write_added_manifest().await?;
+        let existing_manifests = 
snapshot_produce_operation.existing_manifest(self).await?;
+        // # TODO
+        // Support process delete entries.
+
+        let mut manifest_files = vec![added_manifest];
+        manifest_files.extend(existing_manifests);
+        let manifest_files = 
manifest_process.process_manifeset(manifest_files);
+        Ok(manifest_files)
+    }
+
+    // # TODO
+    // Fulfill this function
+    fn summary<OP: SnapshotProduceOperation>(&self, 
snapshot_produce_operation: &OP) -> Summary {
+        Summary {
+            operation: snapshot_produce_operation.operation(),
+            additional_properties: self.snapshot_properties.clone(),
+        }
+    }
+
+    fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
+        format!(
+            "{}/{}/snap-{}-{}-{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.snapshot_id,
+            attempt,
+            self.commit_uuid,
+            DataFileFormat::Avro
+        )
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply<OP: SnapshotProduceOperation, MP: ManifestProcess>(
+        mut self,
+        snapshot_produce_operation: OP,
+        process: MP,
+    ) -> Result<Transaction<'a>> {
+        let new_manifests = self
+            .manifest_file(&snapshot_produce_operation, &process)
+            .await?;
+        let next_seq_num = self.tx.table.metadata().next_sequence_number();
+
+        let summary = self.summary(&snapshot_produce_operation);
+
+        let manifest_list_path = self.generate_manifest_list_file_path(0);
+
+        let mut manifest_list_writer = match 
self.tx.table.metadata().format_version() {
+            FormatVersion::V1 => ManifestListWriter::v1(
+                self.tx
+                    .table
+                    .file_io()
+                    .new_output(manifest_list_path.clone())?,
+                self.snapshot_id,
+                self.tx.table.metadata().current_snapshot_id(),
+            ),
+            FormatVersion::V2 => ManifestListWriter::v2(
+                self.tx
+                    .table
+                    .file_io()
+                    .new_output(manifest_list_path.clone())?,
+                self.snapshot_id,
+                self.tx.table.metadata().current_snapshot_id(),
+                next_seq_num,
+            ),
+        };
+        manifest_list_writer.add_manifests(new_manifests.into_iter())?;
+        manifest_list_writer.close().await?;
+
+        let commit_ts = chrono::Utc::now().timestamp_millis();
+        let new_snapshot = Snapshot::builder()
+            .with_manifest_list(manifest_list_path)
+            .with_snapshot_id(self.snapshot_id)
+            
.with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id())
+            .with_sequence_number(next_seq_num)
+            .with_summary(summary)
+            .with_schema_id(self.tx.table.metadata().current_schema_id())
+            .with_timestamp_ms(commit_ts)
+            .build();
+
+        self.tx.append_updates(vec![
+            TableUpdate::AddSnapshot {
+                snapshot: new_snapshot,
+            },
+            TableUpdate::SetSnapshotRef {
+                ref_name: MAIN_BRANCH.to_string(),
+                reference: SnapshotReference::new(
+                    self.snapshot_id,
+                    SnapshotRetention::branch(None, None, None),
+                ),
+            },
+        ])?;
+        self.tx.append_requirements(vec![
+            TableRequirement::UuidMatch {
+                uuid: self.tx.table.metadata().uuid(),
+            },
+            TableRequirement::RefSnapshotIdMatch {
+                r#ref: MAIN_BRANCH.to_string(),
+                snapshot_id: self.tx.table.metadata().current_snapshot_id(),
+            },
+        ])?;
+        Ok(self.tx)
+    }
+}
diff --git a/crates/iceberg/src/transaction/sort_order.rs 
b/crates/iceberg/src/transaction/sort_order.rs
new file mode 100644
index 00000000..4f21eef0
--- /dev/null
+++ b/crates/iceberg/src/transaction/sort_order.rs
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use crate::spec::{NullOrder, SortDirection, SortField, SortOrder, Transform};
+use crate::transaction::Transaction;
+use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
+
+/// Transaction action for replacing sort order.
+pub struct ReplaceSortOrderAction<'a> {
+    pub tx: Transaction<'a>,
+    pub sort_fields: Vec<SortField>,
+}
+
+impl<'a> ReplaceSortOrderAction<'a> {
+    /// Adds a field for sorting in ascending order.
+    pub fn asc(self, name: &str, null_order: NullOrder) -> Result<Self> {
+        self.add_sort_field(name, SortDirection::Ascending, null_order)
+    }
+
+    /// Adds a field for sorting in descending order.
+    pub fn desc(self, name: &str, null_order: NullOrder) -> Result<Self> {
+        self.add_sort_field(name, SortDirection::Descending, null_order)
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub fn apply(mut self) -> Result<Transaction<'a>> {
+        let unbound_sort_order = SortOrder::builder()
+            .with_fields(self.sort_fields)
+            .build_unbound()?;
+
+        let updates = vec![
+            TableUpdate::AddSortOrder {
+                sort_order: unbound_sort_order,
+            },
+            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
+        ];
+
+        let requirements = vec![
+            TableRequirement::CurrentSchemaIdMatch {
+                current_schema_id: 
self.tx.table.metadata().current_schema().schema_id(),
+            },
+            TableRequirement::DefaultSortOrderIdMatch {
+                default_sort_order_id: 
self.tx.table.metadata().default_sort_order().order_id,
+            },
+        ];
+
+        self.tx.append_requirements(requirements)?;
+        self.tx.append_updates(updates)?;
+        Ok(self.tx)
+    }
+
+    fn add_sort_field(
+        mut self,
+        name: &str,
+        sort_direction: SortDirection,
+        null_order: NullOrder,
+    ) -> Result<Self> {
+        let field_id = self
+            .tx
+            .table
+            .metadata()
+            .current_schema()
+            .field_id_by_name(name)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Cannot find field {} in table schema", name),
+                )
+            })?;
+
+        let sort_field = SortField::builder()
+            .source_id(field_id)
+            .transform(Transform::Identity)
+            .direction(sort_direction)
+            .null_order(null_order)
+            .build();
+
+        self.sort_fields.push(sort_field);
+        Ok(self)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::transaction::tests::make_v2_table;
+    use crate::transaction::Transaction;
+    use crate::{TableRequirement, TableUpdate};
+
+    #[test]
+    fn test_replace_sort_order() {
+        let table = make_v2_table();
+        let tx = Transaction::new(&table);
+        let tx = tx.replace_sort_order().apply().unwrap();
+
+        assert_eq!(
+            vec![
+                TableUpdate::AddSortOrder {
+                    sort_order: Default::default()
+                },
+                TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }
+            ],
+            tx.updates
+        );
+
+        assert_eq!(
+            vec![
+                TableRequirement::CurrentSchemaIdMatch {
+                    current_schema_id: 1
+                },
+                TableRequirement::DefaultSortOrderIdMatch {
+                    default_sort_order_id: 3
+                }
+            ],
+            tx.requirements
+        );
+    }
+}

Reply via email to