This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f0150d5c feat: Make duplicate check optional for adding parquet files
(#1034)
f0150d5c is described below
commit f0150d5c62620fe1fa786fee4c5e83621e21bb13
Author: Jonathan Chen <[email protected]>
AuthorDate: Tue Mar 11 22:41:25 2025 -0400
feat: Make duplicate check optional for adding parquet files (#1034)
## Which issue does this PR close?
- Closes #1031.
## What changes are included in this PR?
Added option for checking duplicates when adding parquet files.
---
crates/iceberg/src/transaction.rs | 94 ++++++++++++++++++++++-----------------
1 file changed, 52 insertions(+), 42 deletions(-)
diff --git a/crates/iceberg/src/transaction.rs
b/crates/iceberg/src/transaction.rs
index b9459291..15d5c99a 100644
--- a/crates/iceberg/src/transaction.rs
+++ b/crates/iceberg/src/transaction.rs
@@ -177,6 +177,7 @@ impl<'a> Transaction<'a> {
/// FastAppendAction is a transaction action for fast append data files to the
table.
pub struct FastAppendAction<'a> {
snapshot_produce_action: SnapshotProduceAction<'a>,
+ check_duplicate: bool,
}
impl<'a> FastAppendAction<'a> {
@@ -196,9 +197,16 @@ impl<'a> FastAppendAction<'a> {
commit_uuid,
snapshot_properties,
)?,
+ check_duplicate: true,
})
}
+ /// Set whether to check duplicate files
+ pub fn with_check_duplicate(mut self, v: bool) -> Self {
+ self.check_duplicate = v;
+ self
+ }
+
/// Add data files to the snapshot.
pub fn add_data_files(
&mut self,
@@ -242,51 +250,53 @@ impl<'a> FastAppendAction<'a> {
/// Finished building the action and apply it to the transaction.
pub async fn apply(self) -> Result<Transaction<'a>> {
// Checks duplicate files
- let new_files: HashSet<&str> = self
- .snapshot_produce_action
- .added_data_files
- .iter()
- .map(|df| df.file_path.as_str())
- .collect();
-
- let mut manifest_stream = self
- .snapshot_produce_action
- .tx
- .table
- .inspect()
- .manifests()
- .scan()
- .await?;
- let mut referenced_files = Vec::new();
-
- while let Some(batch) = manifest_stream.try_next().await? {
- let file_path_array = batch
- .column(1)
- .as_any()
- .downcast_ref::<StringArray>()
- .ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- "Failed to downcast file_path column to StringArray",
- )
- })?;
-
- for i in 0..batch.num_rows() {
- let file_path = file_path_array.value(i);
- if new_files.contains(file_path) {
- referenced_files.push(file_path.to_string());
+ if self.check_duplicate {
+ let new_files: HashSet<&str> = self
+ .snapshot_produce_action
+ .added_data_files
+ .iter()
+ .map(|df| df.file_path.as_str())
+ .collect();
+
+ let mut manifest_stream = self
+ .snapshot_produce_action
+ .tx
+ .table
+ .inspect()
+ .manifests()
+ .scan()
+ .await?;
+ let mut referenced_files = Vec::new();
+
+ while let Some(batch) = manifest_stream.try_next().await? {
+ let file_path_array = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Failed to downcast file_path column to
StringArray",
+ )
+ })?;
+
+ for i in 0..batch.num_rows() {
+ let file_path = file_path_array.value(i);
+ if new_files.contains(file_path) {
+ referenced_files.push(file_path.to_string());
+ }
}
}
- }
- if !referenced_files.is_empty() {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Cannot add files that are already referenced by table,
files: {}",
- referenced_files.join(", ")
- ),
- ));
+ if !referenced_files.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot add files that are already referenced by
table, files: {}",
+ referenced_files.join(", ")
+ ),
+ ));
+ }
}
self.snapshot_produce_action