This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f0150d5c feat: Make duplicate check optional for adding parquet files 
(#1034)
f0150d5c is described below

commit f0150d5c62620fe1fa786fee4c5e83621e21bb13
Author: Jonathan Chen <[email protected]>
AuthorDate: Tue Mar 11 22:41:25 2025 -0400

    feat: Make duplicate check optional for adding parquet files (#1034)
    
    ## Which issue does this PR close?
    
    - Closes #1031.
    
    ## What changes are included in this PR?
    Added option for checking duplicates when adding parquet files.
---
 crates/iceberg/src/transaction.rs | 94 ++++++++++++++++++++++-----------------
 1 file changed, 52 insertions(+), 42 deletions(-)

diff --git a/crates/iceberg/src/transaction.rs 
b/crates/iceberg/src/transaction.rs
index b9459291..15d5c99a 100644
--- a/crates/iceberg/src/transaction.rs
+++ b/crates/iceberg/src/transaction.rs
@@ -177,6 +177,7 @@ impl<'a> Transaction<'a> {
 /// FastAppendAction is a transaction action for fast append data files to the 
table.
 pub struct FastAppendAction<'a> {
     snapshot_produce_action: SnapshotProduceAction<'a>,
+    check_duplicate: bool,
 }
 
 impl<'a> FastAppendAction<'a> {
@@ -196,9 +197,16 @@ impl<'a> FastAppendAction<'a> {
                 commit_uuid,
                 snapshot_properties,
             )?,
+            check_duplicate: true,
         })
     }
 
+    /// Set whether to check duplicate files
+    pub fn with_check_duplicate(mut self, v: bool) -> Self {
+        self.check_duplicate = v;
+        self
+    }
+
     /// Add data files to the snapshot.
     pub fn add_data_files(
         &mut self,
@@ -242,51 +250,53 @@ impl<'a> FastAppendAction<'a> {
     /// Finished building the action and apply it to the transaction.
     pub async fn apply(self) -> Result<Transaction<'a>> {
         // Checks duplicate files
-        let new_files: HashSet<&str> = self
-            .snapshot_produce_action
-            .added_data_files
-            .iter()
-            .map(|df| df.file_path.as_str())
-            .collect();
-
-        let mut manifest_stream = self
-            .snapshot_produce_action
-            .tx
-            .table
-            .inspect()
-            .manifests()
-            .scan()
-            .await?;
-        let mut referenced_files = Vec::new();
-
-        while let Some(batch) = manifest_stream.try_next().await? {
-            let file_path_array = batch
-                .column(1)
-                .as_any()
-                .downcast_ref::<StringArray>()
-                .ok_or_else(|| {
-                    Error::new(
-                        ErrorKind::DataInvalid,
-                        "Failed to downcast file_path column to StringArray",
-                    )
-                })?;
-
-            for i in 0..batch.num_rows() {
-                let file_path = file_path_array.value(i);
-                if new_files.contains(file_path) {
-                    referenced_files.push(file_path.to_string());
+        if self.check_duplicate {
+            let new_files: HashSet<&str> = self
+                .snapshot_produce_action
+                .added_data_files
+                .iter()
+                .map(|df| df.file_path.as_str())
+                .collect();
+
+            let mut manifest_stream = self
+                .snapshot_produce_action
+                .tx
+                .table
+                .inspect()
+                .manifests()
+                .scan()
+                .await?;
+            let mut referenced_files = Vec::new();
+
+            while let Some(batch) = manifest_stream.try_next().await? {
+                let file_path_array = batch
+                    .column(1)
+                    .as_any()
+                    .downcast_ref::<StringArray>()
+                    .ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::DataInvalid,
+                            "Failed to downcast file_path column to 
StringArray",
+                        )
+                    })?;
+
+                for i in 0..batch.num_rows() {
+                    let file_path = file_path_array.value(i);
+                    if new_files.contains(file_path) {
+                        referenced_files.push(file_path.to_string());
+                    }
                 }
             }
-        }
 
-        if !referenced_files.is_empty() {
-            return Err(Error::new(
-                ErrorKind::DataInvalid,
-                format!(
-                    "Cannot add files that are already referenced by table, 
files: {}",
-                    referenced_files.join(", ")
-                ),
-            ));
+            if !referenced_files.is_empty() {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Cannot add files that are already referenced by 
table, files: {}",
+                        referenced_files.join(", ")
+                    ),
+                ));
+            }
         }
 
         self.snapshot_produce_action

Reply via email to