This is an automated email from the ASF dual-hosted git repository. liurenjie1024 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push: new a3bf8294 refine: add new_manifest_writer in SnapshotProducer (#1481) a3bf8294 is described below commit a3bf8294eb3440ac37cfdfd2e4bc2f40c86ae59b Author: ZENOTME <43447882+zeno...@users.noreply.github.com> AuthorDate: Wed Jul 2 18:30:25 2025 +0800 refine: add new_manifest_writer in SnapshotProducer (#1481) ## Which issue does this PR close? This PR add new_manifest_writer in SnapshotProducer and this function can be used to create different manifset writer for different action in the future, e.g. MergeAppend https://github.com/apache/iceberg-rust/pull/902 ## What changes are included in this PR? ## Are these changes tested? Co-authored-by: ZENOTME <st810918...@gmail.com> --- crates/iceberg/src/transaction/snapshot.rs | 48 +++++++++++++++++------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 16b94b08..113e6a76 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -22,12 +22,12 @@ use std::ops::RangeFrom; use uuid::Uuid; use crate::error::Result; -use crate::io::OutputFile; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestEntry, ManifestFile, - ManifestListWriter, ManifestWriterBuilder, Operation, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, - PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention, - SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, + DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, + ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, + Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, + Summary, update_snapshot_summaries, }; use crate::table::Table; use crate::transaction::ActionCommit; @@ -177,7 +177,11 @@ impl SnapshotProducer { snapshot_id } - fn new_manifest_output(&mut self, table: &Table) -> Result<OutputFile> { + fn new_manifest_writer( + &mut self, + content: ManifestContentType, + table: &Table, + ) -> Result<ManifestWriter> { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", table.metadata().location(), @@ -186,7 +190,22 @@ impl SnapshotProducer { self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - table.file_io().new_output(new_manifest_path) + let output_file = table.file_io().new_output(new_manifest_path)?; + let builder = ManifestWriterBuilder::new( + output_file, + Some(self.snapshot_id), + self.key_metadata.clone(), + table.metadata().current_schema().clone(), + table.metadata().default_partition_spec().as_ref().clone(), + ); + if table.metadata().format_version() == FormatVersion::V1 { + Ok(builder.build_v1()) + } else { + match content { + ManifestContentType::Data => Ok(builder.build_v2_data()), + ManifestContentType::Deletes => Ok(builder.build_v2_deletes()), + } + } } // Check if the partition value is compatible with the partition type. @@ -244,20 +263,7 @@ impl SnapshotProducer { builder.build() } }); - let mut writer = { - let builder = ManifestWriterBuilder::new( - self.new_manifest_output(table)?, - Some(self.snapshot_id), - self.key_metadata.clone(), - table.metadata().current_schema().clone(), - table.metadata().default_partition_spec().as_ref().clone(), - ); - if table.metadata().format_version() == FormatVersion::V1 { - builder.build_v1() - } else { - builder.build_v2_data() - } - }; + let mut writer = self.new_manifest_writer(ManifestContentType::Data, table)?; for entry in manifest_entries { writer.add_entry(entry)?; }