This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new a6675394 feat(reader): Add binary support to `get_arrow_datum` for
equality deletes with binary type (#1848)
a6675394 is described below
commit a6675394c0b15f90e70b907b0742d5be6ac39a3e
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue Nov 18 06:00:25 2025 -0500
feat(reader): Add binary support to `get_arrow_datum` for equality deletes
with binary type (#1848)
---
crates/iceberg/src/arrow/caching_delete_file_loader.rs | 16 ++++++++++++----
crates/iceberg/src/arrow/schema.rs | 15 +++++++++++++--
2 files changed, 25 insertions(+), 6 deletions(-)
diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index f1c4f86f..192ca390 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -543,7 +543,9 @@ mod tests {
use std::sync::Arc;
use arrow_array::cast::AsArray;
- use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch,
StringArray, StructArray};
+ use arrow_array::{
+ ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatch,
StringArray, StructArray,
+ };
use arrow_schema::{DataType, Field, Fields};
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
use parquet::basic::Compression;
@@ -552,6 +554,8 @@ mod tests {
use super::*;
use crate::arrow::delete_filter::tests::setup;
+ use crate::scan::FileScanTaskDeleteFile;
+ use crate::spec::{DataContentType, Schema};
#[tokio::test]
async fn test_delete_file_loader_parse_equality_deletes() {
@@ -567,7 +571,7 @@ mod tests {
.await
.expect("could not get batch stream");
- let eq_ids = HashSet::from_iter(vec![2, 3, 4, 6]);
+ let eq_ids = HashSet::from_iter(vec![2, 3, 4, 6, 8]);
let parsed_eq_delete =
CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream(
record_batch_stream,
@@ -577,7 +581,7 @@ mod tests {
.expect("error parsing batch stream");
println!("{parsed_eq_delete}");
- let expected = "((((y != 1) OR (z != 100)) OR (a != \"HELP\")) OR (sa
!= 4)) AND ((((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL)) OR (sa !=
5))".to_string();
+ let expected = "(((((y != 1) OR (z != 100)) OR (a != \"HELP\")) OR (sa
!= 4)) OR (b != 62696E6172795F64617461)) AND (((((y != 2) OR (z IS NOT NULL))
OR (a IS NOT NULL)) OR (sa != 5)) OR (b IS NOT NULL))".to_string();
assert_eq!(parsed_eq_delete.to_string(), expected);
}
@@ -611,6 +615,9 @@ mod tests {
),
]));
+ let col_b_vals = vec![Some(&b"binary_data"[..]), None];
+ let col_b = Arc::new(BinaryArray::from(col_b_vals)) as ArrayRef;
+
let equality_delete_schema = {
let struct_field = DataType::Struct(Fields::from(vec![
simple_field("sa", DataType::Int32, false, "6"),
@@ -628,12 +635,13 @@ mod tests {
(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()),
])),
simple_field("s", struct_field, false, "5"),
+ simple_field("b", DataType::Binary, true, "8"),
];
Arc::new(arrow_schema::Schema::new(fields))
};
let equality_deletes_to_write =
RecordBatch::try_new(equality_delete_schema.clone(), vec![
- col_y, col_z, col_a, col_s,
+ col_y, col_z, col_a, col_s, col_b,
])
.unwrap();
diff --git a/crates/iceberg/src/arrow/schema.rs
b/crates/iceberg/src/arrow/schema.rs
index e10db3a5..ec0135bd 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -22,8 +22,8 @@ use std::sync::Arc;
use arrow_array::types::{Decimal128Type, validate_decimal_precision_and_scale};
use arrow_array::{
- BooleanArray, Date32Array, Datum as ArrowDatum, Decimal128Array,
FixedSizeBinaryArray,
- Float32Array, Float64Array, Int32Array, Int64Array, Scalar, StringArray,
+ BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum,
Decimal128Array,
+ FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array,
Scalar, StringArray,
TimestampMicrosecondArray,
};
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
@@ -678,6 +678,9 @@ pub(crate) fn get_arrow_datum(datum: &Datum) ->
Result<Arc<dyn ArrowDatum + Send
(PrimitiveType::String, PrimitiveLiteral::String(value)) => {
Ok(Arc::new(StringArray::new_scalar(value.as_str())))
}
+ (PrimitiveType::Binary, PrimitiveLiteral::Binary(value)) => {
+ Ok(Arc::new(BinaryArray::new_scalar(value.as_slice())))
+ }
(PrimitiveType::Date, PrimitiveLiteral::Int(value)) => {
Ok(Arc::new(Date32Array::new_scalar(*value)))
}
@@ -1814,6 +1817,14 @@ mod tests {
assert!(is_scalar);
assert_eq!(array.value(0), "abc");
}
+ {
+ let datum = Datum::binary(vec![1, 2, 3, 4]);
+ let arrow_datum = get_arrow_datum(&datum).unwrap();
+ let (array, is_scalar) = arrow_datum.get();
+ let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
+ assert!(is_scalar);
+ assert_eq!(array.value(0), &[1, 2, 3, 4]);
+ }
{
let datum = Datum::date(42);
let arrow_datum = get_arrow_datum(&datum).unwrap();