This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 6bda65882 Add case-sensitive support for equality deletes in
DeleteFilter (#1930)
6bda65882 is described below
commit 6bda658824f65c5f66494e6ae0d235fbe2edfe0c
Author: slfan1989 <[email protected]>
AuthorDate: Tue Dec 16 20:22:07 2025 +0800
Add case-sensitive support for equality deletes in DeleteFilter (#1930)
---
.../src/arrow/caching_delete_file_loader.rs | 1 +
crates/iceberg/src/arrow/delete_filter.rs | 62 ++++++++++++++++++++--
crates/iceberg/src/arrow/reader.rs | 15 ++++++
crates/iceberg/src/scan/context.rs | 5 ++
crates/iceberg/src/scan/mod.rs | 2 +
crates/iceberg/src/scan/task.rs | 3 ++
6 files changed, 85 insertions(+), 3 deletions(-)
diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 250fc5e8d..aceeae49f 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -911,6 +911,7 @@ mod tests {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
};
// Load the deletes - should handle both types without error
diff --git a/crates/iceberg/src/arrow/delete_filter.rs
b/crates/iceberg/src/arrow/delete_filter.rs
index 14b5124ee..d05e02899 100644
--- a/crates/iceberg/src/arrow/delete_filter.rs
+++ b/crates/iceberg/src/arrow/delete_filter.rs
@@ -141,8 +141,8 @@ impl DeleteFilter {
return Ok(None);
}
- // TODO: handle case-insensitive case
- let bound_predicate =
combined_predicate.bind(file_scan_task.schema.clone(), false)?;
+ let bound_predicate = combined_predicate
+ .bind(file_scan_task.schema.clone(),
file_scan_task.case_sensitive)?;
Ok(Some(bound_predicate))
}
@@ -211,8 +211,9 @@ pub(crate) mod tests {
use super::*;
use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
+ use crate::expr::Reference;
use crate::io::FileIO;
- use crate::spec::{DataFileFormat, Schema};
+ use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType,
Schema, Type};
type ArrowSchemaRef = Arc<ArrowSchema>;
@@ -344,6 +345,7 @@ pub(crate) mod tests {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
},
FileScanTask {
start: 0,
@@ -358,6 +360,7 @@ pub(crate) mod tests {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
},
];
@@ -380,4 +383,57 @@ pub(crate) mod tests {
];
Arc::new(arrow_schema::Schema::new(fields))
}
+
+ #[tokio::test]
+ async fn test_build_equality_delete_predicate_case_sensitive() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "Id",
Type::Primitive(PrimitiveType::Long)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // ---------- fake FileScanTask ----------
+ let task = FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: "data.parquet".to_string(),
+ data_file_format: crate::spec::DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![],
+ predicate: None,
+ deletes: vec![FileScanTaskDeleteFile {
+ file_path: "eq-del.parquet".to_string(),
+ file_type: DataContentType::EqualityDeletes,
+ partition_spec_id: 0,
+ equality_ids: None,
+ }],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
+ case_sensitive: true,
+ };
+
+ let filter = DeleteFilter::default();
+
+ // ---------- insert equality delete predicate ----------
+ let pred = Reference::new("id").equal_to(Datum::long(10));
+
+ let (tx, rx) = tokio::sync::oneshot::channel();
+ filter.insert_equality_delete("eq-del.parquet", rx);
+
+ tx.send(pred).unwrap();
+
+ // ---------- should FAIL ----------
+ let result = filter.build_equality_delete_predicate(&task).await;
+
+ assert!(
+ result.is_err(),
+ "case_sensitive=true should fail when column case mismatches"
+ );
+ }
}
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index 6209c1e26..f7f90663a 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -2082,6 +2082,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -2403,6 +2404,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
};
// Task 2: read the second and third row groups
@@ -2419,6 +2421,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
};
let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as
FileScanTaskStream;
@@ -2546,6 +2549,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -2717,6 +2721,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
};
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
@@ -2934,6 +2939,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
};
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
@@ -3144,6 +3150,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
};
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
@@ -3247,6 +3254,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3344,6 +3352,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3430,6 +3439,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3530,6 +3540,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3659,6 +3670,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3755,6 +3767,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3864,6 +3877,7 @@ message schema {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -4003,6 +4017,7 @@ message schema {
partition: Some(partition_data),
partition_spec: Some(partition_spec),
name_mapping: None,
+ case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
diff --git a/crates/iceberg/src/scan/context.rs
b/crates/iceberg/src/scan/context.rs
index f28b6b090..169d8e640 100644
--- a/crates/iceberg/src/scan/context.rs
+++ b/crates/iceberg/src/scan/context.rs
@@ -46,6 +46,7 @@ pub(crate) struct ManifestFileContext {
snapshot_schema: SchemaRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_index: DeleteFileIndex,
+ case_sensitive: bool,
}
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -59,6 +60,7 @@ pub(crate) struct ManifestEntryContext {
pub partition_spec_id: i32,
pub snapshot_schema: SchemaRef,
pub delete_file_index: DeleteFileIndex,
+ pub case_sensitive: bool,
}
impl ManifestFileContext {
@@ -89,6 +91,7 @@ impl ManifestFileContext {
bound_predicates: bound_predicates.clone(),
snapshot_schema: snapshot_schema.clone(),
delete_file_index: delete_file_index.clone(),
+ case_sensitive: self.case_sensitive,
};
sender
@@ -135,6 +138,7 @@ impl ManifestEntryContext {
partition_spec: None,
// TODO: Extract name_mapping from table metadata property
"schema.name-mapping.default"
name_mapping: None,
+ case_sensitive: self.case_sensitive,
})
}
}
@@ -277,6 +281,7 @@ impl PlanContext {
field_ids: self.field_ids.clone(),
expression_evaluator_cache:
self.expression_evaluator_cache.clone(),
delete_file_index,
+ case_sensitive: self.case_sensitive,
}
}
}
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index 1f7fa50df..c055c12c9 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -1885,6 +1885,7 @@ pub mod tests {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
};
test_fn(task);
@@ -1902,6 +1903,7 @@ pub mod tests {
partition: None,
partition_spec: None,
name_mapping: None,
+ case_sensitive: false,
};
test_fn(task);
}
diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs
index e1ef241a5..5349a9bdd 100644
--- a/crates/iceberg/src/scan/task.rs
+++ b/crates/iceberg/src/scan/task.rs
@@ -104,6 +104,9 @@ pub struct FileScanTask {
#[serde(serialize_with = "serialize_not_implemented")]
#[serde(deserialize_with = "deserialize_not_implemented")]
pub name_mapping: Option<Arc<NameMapping>>,
+
+ /// Whether this scan task should treat column names as case-sensitive
when binding predicates.
+ pub case_sensitive: bool,
}
impl FileScanTask {