This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 835f528f fix(reader): Equality delete files with partial schemas
(containing only equality columns) (#1782)
835f528f is described below
commit 835f528f4dd1d700158ce5339e14bc1da04626d1
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue Nov 4 04:06:05 2025 -0500
fix(reader): Equality delete files with partial schemas (containing only
equality columns) (#1782)
## What issue does this PR close?
Partially address #1749.
## Rationale for this change
Equality delete files with partial schemas (containing only equality
columns) were hitting Arrow validation errors: "Column 'id' is declared
as non-nullable but contains null values” in Iceberg Java’s
TestSparkReaderDeletes suite. The bug occurs because `evolve_schema()`
adds missing columns with NULL values, which fails when those columns
are declared REQUIRED in the table schema.
## What changes are included in this PR?
Change the `evolve_schema()` call to take `equality_ids` because per the
Iceberg spec, evolve schema for equality deletes but only for the
`equality_ids` columns, not all table columns.
## Are these changes tested?
`test_partial_schema_equality_deletes_evolve_succeeds`
---
.../src/arrow/caching_delete_file_loader.rs | 118 +++++++++++++++++++--
crates/iceberg/src/arrow/delete_file_loader.rs | 24 +++--
2 files changed, 124 insertions(+), 18 deletions(-)
diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 77f29b7f..078635c9 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -224,16 +224,22 @@ impl CachingDeleteFileLoader {
let (sender, receiver) = channel();
del_filter.insert_equality_delete(&task.file_path, receiver);
+ // Per the Iceberg spec, evolve schema for equality deletes
but only for the
+ // equality_ids columns, not all table columns.
+ let equality_ids_vec = task.equality_ids.clone().unwrap();
+ let evolved_stream = BasicDeleteFileLoader::evolve_schema(
+ basic_delete_file_loader
+ .parquet_to_batch_stream(&task.file_path)
+ .await?,
+ schema,
+ &equality_ids_vec,
+ )
+ .await?;
+
Ok(DeleteFileContext::FreshEqDel {
- batch_stream: BasicDeleteFileLoader::evolve_schema(
- basic_delete_file_loader
- .parquet_to_batch_stream(&task.file_path)
- .await?,
- schema,
- )
- .await?,
+ batch_stream: evolved_stream,
sender,
- equality_ids:
HashSet::from_iter(task.equality_ids.clone().unwrap()),
+ equality_ids: HashSet::from_iter(equality_ids_vec),
})
}
@@ -536,6 +542,7 @@ mod tests {
use std::fs::File;
use std::sync::Arc;
+ use arrow_array::cast::AsArray;
use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch,
StringArray, StructArray};
use arrow_schema::{DataType, Field, Fields};
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
@@ -686,4 +693,99 @@ mod tests {
let result = delete_filter.get_delete_vector(&file_scan_tasks[1]);
assert!(result.is_none()); // no pos dels for file 3
}
+
+ /// Verifies that evolve_schema on partial-schema equality deletes works
correctly
+ /// when only equality_ids columns are evolved, not all table columns.
+ ///
+ /// Per the [Iceberg
spec](https://iceberg.apache.org/spec/#equality-delete-files),
+ /// equality delete files can contain only a subset of columns.
+ #[tokio::test]
+ async fn test_partial_schema_equality_deletes_evolve_succeeds() {
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
+
+ // Create table schema with REQUIRED fields
+ let table_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ crate::spec::NestedField::required(
+ 1,
+ "id",
+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int),
+ )
+ .into(),
+ crate::spec::NestedField::required(
+ 2,
+ "data",
+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // Write equality delete file with PARTIAL schema (only 'data' column)
+ let delete_file_path = {
+ let data_vals = vec!["a", "d", "g"];
+ let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef;
+
+ let delete_schema =
Arc::new(arrow_schema::Schema::new(vec![simple_field(
+ "data",
+ DataType::Utf8,
+ false,
+ "2", // field ID
+ )]));
+
+ let delete_batch = RecordBatch::try_new(delete_schema.clone(),
vec![data_col]).unwrap();
+
+ let path = format!("{}/partial-eq-deletes.parquet",
&table_location);
+ let file = File::create(&path).unwrap();
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+ let mut writer =
+ ArrowWriter::try_new(file, delete_batch.schema(),
Some(props)).unwrap();
+ writer.write(&delete_batch).expect("Writing batch");
+ writer.close().unwrap();
+ path
+ };
+
+ let file_io =
FileIO::from_path(table_location).unwrap().build().unwrap();
+ let basic_delete_file_loader =
BasicDeleteFileLoader::new(file_io.clone());
+
+ let batch_stream = basic_delete_file_loader
+ .parquet_to_batch_stream(&delete_file_path)
+ .await
+ .unwrap();
+
+ // Only evolve the equality_ids columns (field 2), not all table
columns
+ let equality_ids = vec![2];
+ let evolved_stream =
+ BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema,
&equality_ids)
+ .await
+ .unwrap();
+
+ let result = evolved_stream.try_collect::<Vec<_>>().await;
+
+ assert!(
+ result.is_ok(),
+ "Expected success when evolving only equality_ids columns, got
error: {:?}",
+ result.err()
+ );
+
+ let batches = result.unwrap();
+ assert_eq!(batches.len(), 1);
+
+ let batch = &batches[0];
+ assert_eq!(batch.num_rows(), 3);
+ assert_eq!(batch.num_columns(), 1); // Only 'data' column
+
+ // Verify the actual values are preserved after schema evolution
+ let data_col = batch.column(0).as_string::<i32>();
+ assert_eq!(data_col.value(0), "a");
+ assert_eq!(data_col.value(1), "d");
+ assert_eq!(data_col.value(2), "g");
+ }
}
diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs
b/crates/iceberg/src/arrow/delete_file_loader.rs
index f2c63cc5..c0b1392d 100644
--- a/crates/iceberg/src/arrow/delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/delete_file_loader.rs
@@ -72,20 +72,17 @@ impl BasicDeleteFileLoader {
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}
- /// Evolves the schema of the RecordBatches from an equality delete file
+ /// Evolves the schema of the RecordBatches from an equality delete file.
+ ///
+ /// Per the [Iceberg
spec](https://iceberg.apache.org/spec/#equality-delete-files),
+ /// only evolves the specified `equality_ids` columns, not all table
columns.
pub(crate) async fn evolve_schema(
record_batch_stream: ArrowRecordBatchStream,
target_schema: Arc<Schema>,
+ equality_ids: &[i32],
) -> Result<ArrowRecordBatchStream> {
- let eq_ids = target_schema
- .as_ref()
- .field_id_to_name_map()
- .keys()
- .cloned()
- .collect::<Vec<_>>();
-
let mut record_batch_transformer =
- RecordBatchTransformer::build(target_schema.clone(), &eq_ids);
+ RecordBatchTransformer::build(target_schema.clone(), equality_ids);
let record_batch_stream = record_batch_stream.map(move |record_batch| {
record_batch.and_then(|record_batch| {
@@ -106,7 +103,14 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
) -> Result<ArrowRecordBatchStream> {
let raw_batch_stream =
self.parquet_to_batch_stream(&task.file_path).await?;
- Self::evolve_schema(raw_batch_stream, schema).await
+ // For equality deletes, only evolve the equality_ids columns.
+ // For positional deletes (equality_ids is None), use all field IDs.
+ let field_ids = match &task.equality_ids {
+ Some(ids) => ids.clone(),
+ None => schema.field_id_to_name_map().keys().cloned().collect(),
+ };
+
+ Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
}
}