This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d03c4f86 Migrate to arrow-* v53 (#626)
d03c4f86 is described below
commit d03c4f865cc9775d1dc96fabf32c19a5b8c72d7b
Author: Scott Donnelly <[email protected]>
AuthorDate: Mon Sep 23 08:28:52 2024 +0100
Migrate to arrow-* v53 (#626)
* chore: migrate to arrow-* v53
* chore: update datafusion to 42
* test: fix incorrect test assertion
* chore: update python bindings to arrow 53
---
Cargo.toml | 14 +-
bindings/python/Cargo.toml | 4 +-
crates/iceberg/src/arrow/schema.rs | 92 ++++++-----
.../expr/visitors/row_group_metrics_evaluator.rs | 173 ++++++++++++++-------
crates/integrations/datafusion/Cargo.toml | 2 +-
5 files changed, 188 insertions(+), 97 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index fb3ce08f..82f98103 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -39,12 +39,12 @@ rust-version = "1.77.1"
anyhow = "1.0.72"
apache-avro = "0.17"
array-init = "2"
-arrow-arith = { version = "52" }
-arrow-array = { version = "52" }
-arrow-ord = { version = "52" }
-arrow-schema = { version = "52" }
-arrow-select = { version = "52" }
-arrow-string = { version = "52" }
+arrow-arith = { version = "53" }
+arrow-array = { version = "53" }
+arrow-ord = { version = "53" }
+arrow-schema = { version = "53" }
+arrow-select = { version = "53" }
+arrow-string = { version = "53" }
async-stream = "0.3.5"
async-trait = "0.1"
async-std = "1.12"
@@ -72,7 +72,7 @@ murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.50"
ordered-float = "4"
-parquet = "52"
+parquet = "53"
paste = "1"
pilota = "0.11.2"
pretty_assertions = "1.4"
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 469fc769..8fd71592 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -32,5 +32,5 @@ crate-type = ["cdylib"]
[dependencies]
iceberg = { path = "../../crates/iceberg" }
-pyo3 = { version = "0.21", features = ["extension-module"] }
-arrow = { version = "52", features = ["pyarrow"] }
+pyo3 = { version = "0.22.3", features = ["extension-module"] }
+arrow = { version = "53", features = ["pyarrow"] }
diff --git a/crates/iceberg/src/arrow/schema.rs
b/crates/iceberg/src/arrow/schema.rs
index 08600664..6c97621c 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -665,43 +665,57 @@ pub(crate) fn get_arrow_datum(datum: &Datum) ->
Result<Box<dyn ArrowDatum + Send
}
macro_rules! get_parquet_stat_as_datum {
- ($limit_type:ident) => {
+ ($limit_type:tt) => {
paste::paste! {
/// Gets the $limit_type value from a parquet Statistics struct, as a
Datum
pub(crate) fn [<get_parquet_stat_ $limit_type _as_datum>](
primitive_type: &PrimitiveType, stats: &Statistics
) -> Result<Option<Datum>> {
- Ok(Some(match (primitive_type, stats) {
- (PrimitiveType::Boolean, Statistics::Boolean(stats)) =>
Datum::bool(*stats.$limit_type()),
- (PrimitiveType::Int, Statistics::Int32(stats)) =>
Datum::int(*stats.$limit_type()),
- (PrimitiveType::Date, Statistics::Int32(stats)) =>
Datum::date(*stats.$limit_type()),
- (PrimitiveType::Long, Statistics::Int64(stats)) =>
Datum::long(*stats.$limit_type()),
- (PrimitiveType::Time, Statistics::Int64(stats)) =>
Datum::time_micros(*stats.$limit_type())?,
+ Ok(match (primitive_type, stats) {
+ (PrimitiveType::Boolean, Statistics::Boolean(stats)) =>
stats.[<$limit_type _opt>]().map(|val|Datum::bool(*val)),
+ (PrimitiveType::Int, Statistics::Int32(stats)) =>
stats.[<$limit_type _opt>]().map(|val|Datum::int(*val)),
+ (PrimitiveType::Date, Statistics::Int32(stats)) =>
stats.[<$limit_type _opt>]().map(|val|Datum::date(*val)),
+ (PrimitiveType::Long, Statistics::Int64(stats)) =>
stats.[<$limit_type _opt>]().map(|val|Datum::long(*val)),
+ (PrimitiveType::Time, Statistics::Int64(stats)) => {
+ let Some(val) = stats.[<$limit_type _opt>]() else {
+ return Ok(None);
+ };
+
+ Some(Datum::time_micros(*val)?)
+ }
(PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
- Datum::timestamp_micros(*stats.$limit_type())
+ stats.[<$limit_type
_opt>]().map(|val|Datum::timestamp_micros(*val))
}
(PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
- Datum::timestamptz_micros(*stats.$limit_type())
+ stats.[<$limit_type
_opt>]().map(|val|Datum::timestamptz_micros(*val))
}
(PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
- Datum::timestamp_nanos(*stats.$limit_type())
+ stats.[<$limit_type
_opt>]().map(|val|Datum::timestamp_nanos(*val))
}
(PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
- Datum::timestamptz_nanos(*stats.$limit_type())
+ stats.[<$limit_type
_opt>]().map(|val|Datum::timestamptz_nanos(*val))
}
- (PrimitiveType::Float, Statistics::Float(stats)) =>
Datum::float(*stats.$limit_type()),
- (PrimitiveType::Double, Statistics::Double(stats)) =>
Datum::double(*stats.$limit_type()),
+ (PrimitiveType::Float, Statistics::Float(stats)) =>
stats.[<$limit_type _opt>]().map(|val|Datum::float(*val)),
+ (PrimitiveType::Double, Statistics::Double(stats)) =>
stats.[<$limit_type _opt>]().map(|val|Datum::double(*val)),
(PrimitiveType::String, Statistics::ByteArray(stats)) => {
- Datum::string(stats.$limit_type().as_utf8()?)
+ let Some(val) = stats.[<$limit_type _opt>]() else {
+ return Ok(None);
+ };
+
+ Some(Datum::string(val.as_utf8()?))
}
(PrimitiveType::Decimal {
precision: _,
scale: _,
}, Statistics::ByteArray(stats)) => {
- Datum::new(
+ let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
+ return Ok(None);
+ };
+
+ Some(Datum::new(
primitive_type.clone(),
-
PrimitiveLiteral::Int128(i128::from_le_bytes(stats.[<$limit_type
_bytes>]().try_into()?)),
- )
+
PrimitiveLiteral::Int128(i128::from_le_bytes(bytes.try_into()?)),
+ ))
}
(
PrimitiveType::Decimal {
@@ -709,10 +723,12 @@ macro_rules! get_parquet_stat_as_datum {
scale: _,
},
Statistics::Int32(stats)) => {
- Datum::new(
- primitive_type.clone(),
-
PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())),
- )
+ stats.[<$limit_type _opt>]().map(|val| {
+ Datum::new(
+ primitive_type.clone(),
+ PrimitiveLiteral::Int128(i128::from(*val)),
+ )
+ })
}
(
@@ -722,40 +738,46 @@ macro_rules! get_parquet_stat_as_datum {
},
Statistics::Int64(stats),
) => {
- Datum::new(
- primitive_type.clone(),
-
PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())),
- )
+ stats.[<$limit_type _opt>]().map(|val| {
+ Datum::new(
+ primitive_type.clone(),
+ PrimitiveLiteral::Int128(i128::from(*val)),
+ )
+ })
}
(PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) =>
{
- let raw = stats.[<$limit_type _bytes>]();
- if raw.len() != 16 {
+ let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
+ return Ok(None);
+ };
+ if bytes.len() != 16 {
return Err(Error::new(
ErrorKind::Unexpected,
"Invalid length of uuid bytes.",
));
}
- Datum::uuid(Uuid::from_bytes(
- raw[..16].try_into().unwrap(),
- ))
+ Some(Datum::uuid(Uuid::from_bytes(
+ bytes[..16].try_into().unwrap(),
+ )))
}
(PrimitiveType::Fixed(len),
Statistics::FixedLenByteArray(stat)) => {
- let raw = stat.[<$limit_type _bytes>]();
- if raw.len() != *len as usize {
+ let Some(bytes) = stat.[<$limit_type _bytes_opt>]() else {
+ return Ok(None);
+ };
+ if bytes.len() != *len as usize {
return Err(Error::new(
ErrorKind::Unexpected,
"Invalid length of fixed bytes.",
));
}
- Datum::fixed(raw.to_vec())
+ Some(Datum::fixed(bytes.to_vec()))
}
(PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
- Datum::binary(stat.[<$limit_type _bytes>]().to_vec())
+ return Ok(stat.[<$limit_type
_bytes_opt>]().map(|bytes|Datum::binary(bytes.to_vec())))
}
_ => {
return Ok(None);
}
- }))
+ })
}
}
}
diff --git a/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs
b/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs
index 4bf53d6e..56f8db8b 100644
--- a/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs
@@ -81,8 +81,7 @@ impl<'a> RowGroupMetricsEvaluator<'a> {
}
fn null_count(&self, field_id: i32) -> Option<u64> {
- self.stats_for_field_id(field_id)
- .map(|stats| stats.null_count())
+ self.stats_for_field_id(field_id)?.null_count_opt()
}
fn value_count(&self) -> u64 {
@@ -141,10 +140,6 @@ impl<'a> RowGroupMetricsEvaluator<'a> {
return Ok(None);
};
- if !stats.has_min_max_set() {
- return Ok(None);
- }
-
get_parquet_stat_min_as_datum(&primitive_type, stats)
}
@@ -153,10 +148,6 @@ impl<'a> RowGroupMetricsEvaluator<'a> {
return Ok(None);
};
- if !stats.has_min_max_set() {
- return Ok(None);
- }
-
get_parquet_stat_max_as_datum(&primitive_type, stats)
}
@@ -593,7 +584,7 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(None, None, None, 1, false)),
+ Some(Statistics::float(None, None, None, Some(1), false)),
1,
None,
)?;
@@ -620,7 +611,7 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(None, None, None, 1, false)),
+ Some(Statistics::float(None, None, None, Some(1), false)),
1,
None,
)?;
@@ -647,7 +638,7 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(None, None, None, 0, false)),
+ Some(Statistics::float(None, None, None, Some(0), false)),
1,
None,
)?;
@@ -674,7 +665,7 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(None, None, None, 0, false)),
+ Some(Statistics::float(None, None, None, Some(0), false)),
1,
None,
)?;
@@ -701,7 +692,7 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(None, None, None, 1, false)),
+ Some(Statistics::float(None, None, None, Some(1), false)),
1,
None,
)?;
@@ -728,7 +719,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(0.0), Some(2.0), None, 0, false)),
+ Some(Statistics::float(
+ Some(0.0),
+ Some(2.0),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -755,7 +752,7 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(None, Some(2.0), None, 0, false)),
+ Some(Statistics::float(None, Some(2.0), None, Some(0), false)),
1,
None,
)?;
@@ -782,7 +779,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(0.0), Some(0.9), None, 0, false)),
+ Some(Statistics::float(
+ Some(0.0),
+ Some(0.9),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -809,7 +812,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(0.0), Some(2.0), None, 0, false)),
+ Some(Statistics::float(
+ Some(0.0),
+ Some(2.0),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -836,7 +845,7 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(None, None, None, 1, false)),
+ Some(Statistics::float(None, None, None, Some(1), false)),
1,
None,
)?;
@@ -863,7 +872,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(f32::NAN), Some(2.0), None, 0, false)),
+ Some(Statistics::float(
+ Some(f32::NAN),
+ Some(2.0),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -890,7 +905,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(1.5), Some(2.0), None, 0, false)),
+ Some(Statistics::float(
+ Some(1.5),
+ Some(2.0),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -917,7 +938,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(0.0), Some(f32::NAN), None, 0, false)),
+ Some(Statistics::float(
+ Some(0.0),
+ Some(f32::NAN),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -944,7 +971,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(0.0), Some(0.5), None, 0, false)),
+ Some(Statistics::float(
+ Some(0.0),
+ Some(0.5),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -971,7 +1004,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(0.0), Some(2.0), None, 0, false)),
+ Some(Statistics::float(
+ Some(0.0),
+ Some(2.0),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -998,7 +1037,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(1.0), Some(1.0), None, 0, false)),
+ Some(Statistics::float(
+ Some(1.0),
+ Some(1.0),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -1027,7 +1072,7 @@ mod tests {
1,
None,
1,
- Some(Statistics::byte_array(None, None, None, 1, false)),
+ Some(Statistics::byte_array(None, None, None, Some(1), false)),
)?;
let (iceberg_schema_ref, field_id_map) =
build_iceberg_schema_and_field_map()?;
@@ -1054,7 +1099,7 @@ mod tests {
1,
None,
1,
- Some(Statistics::byte_array(None, None, None, 0, false)),
+ Some(Statistics::byte_array(None, None, None, Some(0), false)),
)?;
let (iceberg_schema_ref, field_id_map) =
build_iceberg_schema_and_field_map()?;
@@ -1086,7 +1131,7 @@ mod tests {
Some(ByteArray::from(vec![255u8])),
Some(ByteArray::from(vec![32u8])),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1120,7 +1165,7 @@ mod tests {
Some(ByteArray::from("ice".as_bytes())),
Some(ByteArray::from(vec![255u8])),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1150,7 +1195,7 @@ mod tests {
None,
1,
// Max val of 0xFF is not valid utf8
- Some(Statistics::byte_array(None, None, None, 1, false)),
+ Some(Statistics::byte_array(None, None, None, Some(1), false)),
)?;
let (iceberg_schema_ref, field_id_map) =
build_iceberg_schema_and_field_map()?;
@@ -1182,7 +1227,7 @@ mod tests {
Some(ByteArray::from("id".as_bytes())),
Some(ByteArray::from("ie".as_bytes())),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1216,7 +1261,7 @@ mod tests {
Some(ByteArray::from("h".as_bytes())),
Some(ByteArray::from("ib".as_bytes())),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1250,7 +1295,7 @@ mod tests {
Some(ByteArray::from("h".as_bytes())),
Some(ByteArray::from("j".as_bytes())),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1279,7 +1324,7 @@ mod tests {
1,
None,
1,
- Some(Statistics::byte_array(None, None, None, 1, false)),
+ Some(Statistics::byte_array(None, None, None, Some(1), false)),
)?;
let (iceberg_schema_ref, field_id_map) =
build_iceberg_schema_and_field_map()?;
@@ -1311,7 +1356,7 @@ mod tests {
Some(ByteArray::from(vec![255u8])),
Some(ByteArray::from(vec![32u8])),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1345,7 +1390,7 @@ mod tests {
Some(ByteArray::from("iceberg".as_bytes())),
Some(ByteArray::from(vec![255u8])),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1379,7 +1424,7 @@ mod tests {
None,
Some(ByteArray::from("iceberg".as_bytes())),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1413,7 +1458,7 @@ mod tests {
Some(ByteArray::from("ice".as_bytes())),
Some(ByteArray::from("iceberg".as_bytes())),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1447,7 +1492,7 @@ mod tests {
Some(ByteArray::from("iceberg".as_bytes())),
None,
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1481,7 +1526,7 @@ mod tests {
Some(ByteArray::from("iceberg".as_bytes())),
Some(ByteArray::from("icy".as_bytes())),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1515,7 +1560,7 @@ mod tests {
Some(ByteArray::from("iceberg".as_bytes())),
Some(ByteArray::from("iceberg".as_bytes())),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1548,7 +1593,7 @@ mod tests {
Some(ByteArray::from("iceberg".as_bytes())),
Some(ByteArray::from("iceberg".as_bytes())),
None,
- 1,
+ Some(1),
false,
)),
)?;
@@ -1577,7 +1622,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(11.0), Some(12.0), None, 0, false)),
+ Some(Statistics::float(
+ Some(11.0),
+ Some(12.0),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -1606,7 +1657,7 @@ mod tests {
1,
None,
1,
- Some(Statistics::byte_array(None, None, None, 0, false)),
+ Some(Statistics::byte_array(None, None, None, Some(0), false)),
)?;
let (iceberg_schema_ref, field_id_map) =
build_iceberg_schema_and_field_map()?;
@@ -1633,7 +1684,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(f32::NAN), Some(1.0), None, 0, false)),
+ Some(Statistics::float(
+ Some(f32::NAN),
+ Some(1.0),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -1660,7 +1717,7 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(4.0), None, None, 0, false)),
+ Some(Statistics::float(Some(4.0), None, None, Some(0), false)),
1,
None,
)?;
@@ -1678,7 +1735,7 @@ mod tests {
iceberg_schema_ref.as_ref(),
)?;
- assert!(result);
+ assert!(!result);
Ok(())
}
@@ -1687,7 +1744,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(0.0), Some(f32::NAN), None, 0, false)),
+ Some(Statistics::float(
+ Some(0.0),
+ Some(f32::NAN),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -1714,7 +1777,13 @@ mod tests {
let row_group_metadata = create_row_group_metadata(
1,
1,
- Some(Statistics::float(Some(0.0), Some(1.0), None, 0, false)),
+ Some(Statistics::float(
+ Some(0.0),
+ Some(1.0),
+ None,
+ Some(0),
+ false,
+ )),
1,
None,
)?;
@@ -1748,7 +1817,7 @@ mod tests {
Some(ByteArray::from("iceberg".as_bytes())),
Some(ByteArray::from("iceberg".as_bytes())),
None,
- 0,
+ Some(0),
false,
)),
)?;
@@ -1862,7 +1931,7 @@ mod tests {
.set_num_rows(num_rows)
.set_column_metadata(vec![
col_1_meta.build()?,
- // .set_statistics(Statistics::float(None, None, None, 1,
false))
+ // .set_statistics(Statistics::float(None, None, None,
Some(1), false))
col_2_meta.build()?,
])
.build();
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index 87e809ce..c4acf117 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -31,7 +31,7 @@ keywords = ["iceberg", "integrations", "datafusion"]
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
-datafusion = { version = "41.0.0" }
+datafusion = { version = "42" }
futures = { workspace = true }
iceberg = { workspace = true }
tokio = { workspace = true }