This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new bbc8578 feat: add transform_literal (#287)
bbc8578 is described below
commit bbc85782135919c4f4148cd45cb45e34039fc728
Author: ZENOTME <[email protected]>
AuthorDate: Wed Mar 27 12:19:08 2024 +0900
feat: add transform_literal (#287)
* add transform_literal
* refine
* fix unwrap
---------
Co-authored-by: ZENOTME <[email protected]>
---
crates/iceberg/src/spec/values.rs | 10 ++
crates/iceberg/src/transform/bucket.rs | 226 +++++++++++++++++++++--
crates/iceberg/src/transform/identity.rs | 4 +
crates/iceberg/src/transform/mod.rs | 7 +-
crates/iceberg/src/transform/temporal.rs | 300 ++++++++++++++++++++++++++++---
crates/iceberg/src/transform/truncate.rs | 154 +++++++++++++---
crates/iceberg/src/transform/void.rs | 4 +
7 files changed, 639 insertions(+), 66 deletions(-)
diff --git a/crates/iceberg/src/spec/values.rs
b/crates/iceberg/src/spec/values.rs
index f31d647..6f62f29 100644
--- a/crates/iceberg/src/spec/values.rs
+++ b/crates/iceberg/src/spec/values.rs
@@ -673,6 +673,16 @@ impl Datum {
)),
}
}
+
+ /// Get the primitive literal from datum.
+ pub fn literal(&self) -> &PrimitiveLiteral {
+ &self.literal
+ }
+
+ /// Get the primitive type from datum.
+ pub fn data_type(&self) -> &PrimitiveType {
+ &self.r#type
+ }
}
/// Values present in iceberg type
diff --git a/crates/iceberg/src/transform/bucket.rs
b/crates/iceberg/src/transform/bucket.rs
index beff0be..015acea 100644
--- a/crates/iceberg/src/transform/bucket.rs
+++ b/crates/iceberg/src/transform/bucket.rs
@@ -20,6 +20,8 @@ use std::sync::Arc;
use arrow_array::ArrayRef;
use arrow_schema::{DataType, TimeUnit};
+use crate::spec::{Datum, PrimitiveLiteral};
+
use super::TransformFunction;
#[derive(Debug)]
@@ -35,39 +37,47 @@ impl Bucket {
impl Bucket {
/// When switch the hash function, we only need to change this function.
+ #[inline]
fn hash_bytes(mut v: &[u8]) -> i32 {
murmur3::murmur3_32(&mut v, 0).unwrap() as i32
}
+ #[inline]
fn hash_int(v: i32) -> i32 {
Self::hash_long(v as i64)
}
+ #[inline]
fn hash_long(v: i64) -> i32 {
Self::hash_bytes(v.to_le_bytes().as_slice())
}
/// v is days from unix epoch
+ #[inline]
fn hash_date(v: i32) -> i32 {
Self::hash_int(v)
}
/// v is microseconds from midnight
+ #[inline]
fn hash_time(v: i64) -> i32 {
Self::hash_long(v)
}
/// v is microseconds from unix epoch
+ #[inline]
fn hash_timestamp(v: i64) -> i32 {
Self::hash_long(v)
}
+ #[inline]
fn hash_str(s: &str) -> i32 {
Self::hash_bytes(s.as_bytes())
}
/// Decimal values are hashed using the minimum number of bytes required
to hold the unscaled value as a two’s complement big-endian
/// ref:
https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
+ #[inline]
fn hash_decimal(v: i128) -> i32 {
let bytes = v.to_be_bytes();
if let Some(start) = bytes.iter().position(|&x| x != 0) {
@@ -79,9 +89,50 @@ impl Bucket {
/// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
/// ref: https://iceberg.apache.org/spec/#partitioning
+ #[inline]
fn bucket_n(&self, v: i32) -> i32 {
(v & i32::MAX) % (self.mod_n as i32)
}
+
+ #[inline]
+ fn bucket_int(&self, v: i32) -> i32 {
+ self.bucket_n(Self::hash_int(v))
+ }
+
+ #[inline]
+ fn bucket_long(&self, v: i64) -> i32 {
+ self.bucket_n(Self::hash_long(v))
+ }
+
+ #[inline]
+ fn bucket_decimal(&self, v: i128) -> i32 {
+ self.bucket_n(Self::hash_decimal(v))
+ }
+
+ #[inline]
+ fn bucket_date(&self, v: i32) -> i32 {
+ self.bucket_n(Self::hash_date(v))
+ }
+
+ #[inline]
+ fn bucket_time(&self, v: i64) -> i32 {
+ self.bucket_n(Self::hash_time(v))
+ }
+
+ #[inline]
+ fn bucket_timestamp(&self, v: i64) -> i32 {
+ self.bucket_n(Self::hash_timestamp(v))
+ }
+
+ #[inline]
+ fn bucket_str(&self, v: &str) -> i32 {
+ self.bucket_n(Self::hash_str(v))
+ }
+
+ #[inline]
+ fn bucket_bytes(&self, v: &[u8]) -> i32 {
+ self.bucket_n(Self::hash_bytes(v))
+ }
}
impl TransformFunction for Bucket {
@@ -91,39 +142,39 @@ impl TransformFunction for Bucket {
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap()
- .unary(|v| self.bucket_n(Self::hash_int(v))),
+ .unary(|v| self.bucket_int(v)),
DataType::Int64 => input
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.unwrap()
- .unary(|v| self.bucket_n(Self::hash_long(v))),
+ .unary(|v| self.bucket_long(v)),
DataType::Decimal128(_, _) => input
.as_any()
.downcast_ref::<arrow_array::Decimal128Array>()
.unwrap()
- .unary(|v| self.bucket_n(Self::hash_decimal(v))),
+ .unary(|v| self.bucket_decimal(v)),
DataType::Date32 => input
.as_any()
.downcast_ref::<arrow_array::Date32Array>()
.unwrap()
- .unary(|v| self.bucket_n(Self::hash_date(v))),
+ .unary(|v| self.bucket_date(v)),
DataType::Time64(TimeUnit::Microsecond) => input
.as_any()
.downcast_ref::<arrow_array::Time64MicrosecondArray>()
.unwrap()
- .unary(|v| self.bucket_n(Self::hash_time(v))),
+ .unary(|v| self.bucket_time(v)),
DataType::Timestamp(TimeUnit::Microsecond, _) => input
.as_any()
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.unwrap()
- .unary(|v| self.bucket_n(Self::hash_timestamp(v))),
+ .unary(|v| self.bucket_timestamp(v)),
DataType::Utf8 => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap()
.iter()
- .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
+ .map(|v| v.map(|v| self.bucket_str(v))),
),
DataType::LargeUtf8 => arrow_array::Int32Array::from_iter(
input
@@ -131,7 +182,7 @@ impl TransformFunction for Bucket {
.downcast_ref::<arrow_array::LargeStringArray>()
.unwrap()
.iter()
- .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
+ .map(|v| v.map(|v| self.bucket_str(v))),
),
DataType::Binary => arrow_array::Int32Array::from_iter(
input
@@ -139,7 +190,7 @@ impl TransformFunction for Bucket {
.downcast_ref::<arrow_array::BinaryArray>()
.unwrap()
.iter()
- .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
+ .map(|v| v.map(|v| self.bucket_bytes(v))),
),
DataType::LargeBinary => arrow_array::Int32Array::from_iter(
input
@@ -147,7 +198,7 @@ impl TransformFunction for Bucket {
.downcast_ref::<arrow_array::LargeBinaryArray>()
.unwrap()
.iter()
- .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
+ .map(|v| v.map(|v| self.bucket_bytes(v))),
),
DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter(
input
@@ -155,18 +206,53 @@ impl TransformFunction for Bucket {
.downcast_ref::<arrow_array::FixedSizeBinaryArray>()
.unwrap()
.iter()
- .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
+ .map(|v| v.map(|v| self.bucket_bytes(v))),
),
- _ => unreachable!("Unsupported data type: {:?}",
input.data_type()),
+ _ => {
+ return Err(crate::Error::new(
+ crate::ErrorKind::FeatureUnsupported,
+ format!(
+ "Unsupported data type for bucket transform: {:?}",
+ input.data_type()
+ ),
+ ))
+ }
};
Ok(Arc::new(res))
}
+
+ fn transform_literal(&self, input: &Datum) -> crate::Result<Option<Datum>>
{
+ let val = match input.literal() {
+ PrimitiveLiteral::Int(v) => self.bucket_int(*v),
+ PrimitiveLiteral::Long(v) => self.bucket_long(*v),
+ PrimitiveLiteral::Decimal(v) => self.bucket_decimal(*v),
+ PrimitiveLiteral::Date(v) => self.bucket_date(*v),
+ PrimitiveLiteral::Time(v) => self.bucket_time(*v),
+ PrimitiveLiteral::Timestamp(v) => self.bucket_timestamp(*v),
+ PrimitiveLiteral::String(v) => self.bucket_str(v.as_str()),
+ PrimitiveLiteral::UUID(v) => self.bucket_bytes(v.as_ref()),
+ PrimitiveLiteral::Binary(v) => self.bucket_bytes(v.as_ref()),
+ PrimitiveLiteral::Fixed(v) => self.bucket_bytes(v.as_ref()),
+ _ => {
+ return Err(crate::Error::new(
+ crate::ErrorKind::FeatureUnsupported,
+ format!(
+ "Unsupported data type for bucket transform: {:?}",
+ input.data_type()
+ ),
+ ))
+ }
+ };
+ Ok(Some(Datum::int(val)))
+ }
}
#[cfg(test)]
mod test {
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
+ use crate::{spec::Datum, transform::TransformFunction};
+
use super::Bucket;
#[test]
fn test_hash() {
@@ -242,4 +328,120 @@ mod test {
-188683207
);
}
+
+ #[test]
+ fn test_int_literal() {
+ let bucket = Bucket::new(10);
+ assert_eq!(
+ bucket.transform_literal(&Datum::int(34)).unwrap().unwrap(),
+ Datum::int(9)
+ );
+ }
+
+ #[test]
+ fn test_long_literal() {
+ let bucket = Bucket::new(10);
+ assert_eq!(
+ bucket.transform_literal(&Datum::long(34)).unwrap().unwrap(),
+ Datum::int(9)
+ );
+ }
+
+ #[test]
+ fn test_decimal_literal() {
+ let bucket = Bucket::new(10);
+ assert_eq!(
+ bucket
+ .transform_literal(&Datum::decimal(1420).unwrap())
+ .unwrap()
+ .unwrap(),
+ Datum::int(9)
+ );
+ }
+
+ #[test]
+ fn test_date_literal() {
+ let bucket = Bucket::new(100);
+ assert_eq!(
+ bucket
+ .transform_literal(&Datum::date(17486))
+ .unwrap()
+ .unwrap(),
+ Datum::int(26)
+ );
+ }
+
+ #[test]
+ fn test_time_literal() {
+ let bucket = Bucket::new(100);
+ assert_eq!(
+ bucket
+ .transform_literal(&Datum::time_micros(81068000000).unwrap())
+ .unwrap()
+ .unwrap(),
+ Datum::int(59)
+ );
+ }
+
+ #[test]
+ fn test_timestamp_literal() {
+ let bucket = Bucket::new(100);
+ assert_eq!(
+ bucket
+ .transform_literal(&Datum::timestamp_micros(1510871468000000))
+ .unwrap()
+ .unwrap(),
+ Datum::int(7)
+ );
+ }
+
+ #[test]
+ fn test_str_literal() {
+ let bucket = Bucket::new(100);
+ assert_eq!(
+ bucket
+ .transform_literal(&Datum::string("iceberg"))
+ .unwrap()
+ .unwrap(),
+ Datum::int(89)
+ );
+ }
+
+ #[test]
+ fn test_uuid_literal() {
+ let bucket = Bucket::new(100);
+ assert_eq!(
+ bucket
+ .transform_literal(&Datum::uuid(
+ "F79C3E09-677C-4BBD-A479-3F349CB785E7".parse().unwrap()
+ ))
+ .unwrap()
+ .unwrap(),
+ Datum::int(40)
+ );
+ }
+
+ #[test]
+ fn test_binary_literal() {
+ let bucket = Bucket::new(128);
+ assert_eq!(
+ bucket
+
.transform_literal(&Datum::binary(b"\x00\x01\x02\x03".to_vec()))
+ .unwrap()
+ .unwrap(),
+ Datum::int(57)
+ );
+ }
+
+ #[test]
+ fn test_fixed_literal() {
+ let bucket = Bucket::new(128);
+ assert_eq!(
+ bucket
+ .transform_literal(&Datum::fixed(b"foo".to_vec()))
+ .unwrap()
+ .unwrap(),
+ Datum::int(32)
+ );
+ }
}
diff --git a/crates/iceberg/src/transform/identity.rs
b/crates/iceberg/src/transform/identity.rs
index d22c28f..49ab612 100644
--- a/crates/iceberg/src/transform/identity.rs
+++ b/crates/iceberg/src/transform/identity.rs
@@ -28,4 +28,8 @@ impl TransformFunction for Identity {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
Ok(input)
}
+
+ fn transform_literal(&self, input: &crate::spec::Datum) ->
Result<Option<crate::spec::Datum>> {
+ Ok(Some(input.clone()))
+ }
}
diff --git a/crates/iceberg/src/transform/mod.rs
b/crates/iceberg/src/transform/mod.rs
index dead9db..7effdbe 100644
--- a/crates/iceberg/src/transform/mod.rs
+++ b/crates/iceberg/src/transform/mod.rs
@@ -16,7 +16,10 @@
// under the License.
//! Transform function used to compute partition values.
-use crate::{spec::Transform, Result};
+use crate::{
+ spec::{Datum, Transform},
+ Result,
+};
use arrow_array::ArrayRef;
mod bucket;
@@ -31,6 +34,8 @@ pub trait TransformFunction: Send {
/// The implementation of this function will need to check and downcast
the input to specific
/// type.
fn transform(&self, input: ArrayRef) -> Result<ArrayRef>;
+ /// transform_literal will take an input literal and transform it into a
new literal.
+ fn transform_literal(&self, input: &Datum) -> Result<Option<Datum>>;
}
/// BoxedTransformFunction is a boxed trait object of TransformFunction.
diff --git a/crates/iceberg/src/transform/temporal.rs
b/crates/iceberg/src/transform/temporal.rs
index 4556543..2a79db3 100644
--- a/crates/iceberg/src/transform/temporal.rs
+++ b/crates/iceberg/src/transform/temporal.rs
@@ -16,6 +16,7 @@
// under the License.
use super::TransformFunction;
+use crate::spec::{Datum, PrimitiveLiteral};
use crate::{Error, ErrorKind, Result};
use arrow_arith::temporal::DatePart;
use arrow_arith::{arity::binary, temporal::date_part};
@@ -23,11 +24,9 @@ use arrow_array::{
types::Date32Type, Array, ArrayRef, Date32Array, Int32Array,
TimestampMicrosecondArray,
};
use arrow_schema::{DataType, TimeUnit};
-use chrono::Datelike;
+use chrono::{DateTime, Datelike};
use std::sync::Arc;
-/// The number of days since unix epoch.
-const DAY_SINCE_UNIX_EPOCH: i32 = 719163;
/// Hour in one second.
const HOUR_PER_SECOND: f64 = 1.0_f64 / 3600.0_f64;
/// Day in one second.
@@ -39,6 +38,21 @@ const UNIX_EPOCH_YEAR: i32 = 1970;
#[derive(Debug)]
pub struct Year;
+impl Year {
+ #[inline]
+ fn timestamp_to_year(timestamp: i64) -> Result<i32> {
+ Ok(DateTime::from_timestamp_micros(timestamp)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Fail to convert timestamp to date in year transform",
+ )
+ })?
+ .year()
+ - UNIX_EPOCH_YEAR)
+ }
+}
+
impl TransformFunction for Year {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let array = date_part(&input, DatePart::Year)
@@ -51,12 +65,54 @@ impl TransformFunction for Year {
.unary(|v| v - UNIX_EPOCH_YEAR),
))
}
+
+ fn transform_literal(&self, input: &crate::spec::Datum) ->
Result<Option<crate::spec::Datum>> {
+ let val = match input.literal() {
+ PrimitiveLiteral::Date(v) => Date32Type::to_naive_date(*v).year()
- UNIX_EPOCH_YEAR,
+ PrimitiveLiteral::Timestamp(v) => Self::timestamp_to_year(*v)?,
+ PrimitiveLiteral::TimestampTZ(v) => Self::timestamp_to_year(*v)?,
+ _ => {
+ return Err(crate::Error::new(
+ crate::ErrorKind::FeatureUnsupported,
+ format!(
+ "Unsupported data type for year transform: {:?}",
+ input.data_type()
+ ),
+ ))
+ }
+ };
+ Ok(Some(Datum::int(val)))
+ }
}
/// Extract a date or timestamp month, as months from 1970-01-01
#[derive(Debug)]
pub struct Month;
+impl Month {
+ #[inline]
+ fn timestamp_to_month(timestamp: i64) -> Result<i32> {
+ // date: aaaa-aa-aa
+ // unix epoch date: 1970-01-01
+ // if date > unix epoch date, delta month = (aa - 1) + 12 * (aaaa-1970)
+ // if date < unix epoch date, delta month = (12 - (aa - 1)) + 12 *
(1970-aaaa-1)
+ let date = DateTime::from_timestamp_micros(timestamp).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Fail to convert timestamp to date in month transform",
+ )
+ })?;
+ let unix_epoch_date = DateTime::from_timestamp_micros(0)
+ .expect("0 timestamp from unix epoch should be valid");
+ if date > unix_epoch_date {
+ Ok((date.month0() as i32) + 12 * (date.year() - UNIX_EPOCH_YEAR))
+ } else {
+ let delta = (12 - date.month0() as i32) + 12 * (UNIX_EPOCH_YEAR -
date.year() - 1);
+ Ok(-delta)
+ }
+ }
+}
+
impl TransformFunction for Month {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let year_array = date_part(&input, DatePart::Year)
@@ -78,12 +134,40 @@ impl TransformFunction for Month {
.unwrap(),
))
}
+
+ fn transform_literal(&self, input: &crate::spec::Datum) ->
Result<Option<crate::spec::Datum>> {
+ let val = match input.literal() {
+ PrimitiveLiteral::Date(v) => {
+ (Date32Type::to_naive_date(*v).year() - UNIX_EPOCH_YEAR) * 12
+ + Date32Type::to_naive_date(*v).month0() as i32
+ }
+ PrimitiveLiteral::Timestamp(v) => Self::timestamp_to_month(*v)?,
+ PrimitiveLiteral::TimestampTZ(v) => Self::timestamp_to_month(*v)?,
+ _ => {
+ return Err(crate::Error::new(
+ crate::ErrorKind::FeatureUnsupported,
+ format!(
+ "Unsupported data type for month transform: {:?}",
+ input.data_type()
+ ),
+ ))
+ }
+ };
+ Ok(Some(Datum::int(val)))
+ }
}
/// Extract a date or timestamp day, as days from 1970-01-01
#[derive(Debug)]
pub struct Day;
+impl Day {
+ #[inline]
+ fn day_timestamp_micro(v: i64) -> i32 {
+ (v as f64 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32
+ }
+}
+
impl TransformFunction for Day {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let res: Int32Array = match input.data_type() {
@@ -91,16 +175,12 @@ impl TransformFunction for Day {
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
- .unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 *
DAY_PER_SECOND) as i32 }),
- DataType::Date32 => {
- input
- .as_any()
- .downcast_ref::<Date32Array>()
- .unwrap()
- .unary(|v| -> i32 {
- Date32Type::to_naive_date(v).num_days_from_ce() -
DAY_SINCE_UNIX_EPOCH
- })
- }
+ .unary(|v| -> i32 { Self::day_timestamp_micro(v) }),
+ DataType::Date32 => input
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .unwrap()
+ .unary(|v| -> i32 { v }),
_ => {
return Err(Error::new(
ErrorKind::Unexpected,
@@ -113,12 +193,37 @@ impl TransformFunction for Day {
};
Ok(Arc::new(res))
}
+
+ fn transform_literal(&self, input: &crate::spec::Datum) ->
Result<Option<crate::spec::Datum>> {
+ let val = match input.literal() {
+ PrimitiveLiteral::Date(v) => *v,
+ PrimitiveLiteral::Timestamp(v) => Self::day_timestamp_micro(*v),
+ PrimitiveLiteral::TimestampTZ(v) => Self::day_timestamp_micro(*v),
+ _ => {
+ return Err(crate::Error::new(
+ crate::ErrorKind::FeatureUnsupported,
+ format!(
+ "Unsupported data type for day transform: {:?}",
+ input.data_type()
+ ),
+ ))
+ }
+ };
+ Ok(Some(Datum::int(val)))
+ }
}
/// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
#[derive(Debug)]
pub struct Hour;
+impl Hour {
+ #[inline]
+ fn hour_timestamp_micro(v: i64) -> i32 {
+ (v as f64 / 1000.0 / 1000.0 * HOUR_PER_SECOND) as i32
+ }
+}
+
impl TransformFunction for Hour {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let res: Int32Array = match input.data_type() {
@@ -126,19 +231,36 @@ impl TransformFunction for Hour {
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
- .unary(|v| -> i32 { (v as f64 * HOUR_PER_SECOND / 1000.0 /
1000.0) as i32 }),
+ .unary(|v| -> i32 { Self::hour_timestamp_micro(v) }),
_ => {
- return Err(Error::new(
- ErrorKind::Unexpected,
+ return Err(crate::Error::new(
+ crate::ErrorKind::FeatureUnsupported,
format!(
- "Should not call internally for unsupported data type
{:?}",
+ "Unsupported data type for hour transform: {:?}",
input.data_type()
),
- ))
+ ));
}
};
Ok(Arc::new(res))
}
+
+ fn transform_literal(&self, input: &crate::spec::Datum) ->
Result<Option<crate::spec::Datum>> {
+ let val = match input.literal() {
+ PrimitiveLiteral::Timestamp(v) => Self::hour_timestamp_micro(*v),
+ PrimitiveLiteral::TimestampTZ(v) => Self::hour_timestamp_micro(*v),
+ _ => {
+ return Err(crate::Error::new(
+ crate::ErrorKind::FeatureUnsupported,
+ format!(
+ "Unsupported data type for hour transform: {:?}",
+ input.data_type()
+ ),
+ ))
+ }
+ };
+ Ok(Some(Datum::int(val)))
+ }
}
#[cfg(test)]
@@ -147,7 +269,10 @@ mod test {
use chrono::{NaiveDate, NaiveDateTime};
use std::sync::Arc;
- use crate::transform::TransformFunction;
+ use crate::{
+ spec::Datum,
+ transform::{BoxedTransformFunction, TransformFunction},
+ };
#[test]
fn test_transform_years() {
@@ -159,6 +284,7 @@ mod test {
NaiveDate::from_ymd_opt(2000, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(2030, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(2060, 1, 1).unwrap(),
+ NaiveDate::from_ymd_opt(1969, 1, 1).unwrap(),
];
let date_array: ArrayRef = Arc::new(Date32Array::from(
ori_date
@@ -171,11 +297,12 @@ mod test {
));
let res = year.transform(date_array).unwrap();
let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
- assert_eq!(res.len(), 4);
+ assert_eq!(res.len(), 5);
assert_eq!(res.value(0), 0);
assert_eq!(res.value(1), 30);
assert_eq!(res.value(2), 60);
assert_eq!(res.value(3), 90);
+ assert_eq!(res.value(4), -1);
// Test TimestampMicrosecond
let ori_timestamp = vec![
@@ -187,6 +314,8 @@ mod test {
.unwrap(),
NaiveDateTime::parse_from_str("2060-01-01 11:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
.unwrap(),
+ NaiveDateTime::parse_from_str("1969-01-01 00:00:00.00", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
];
let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
ori_timestamp
@@ -207,11 +336,71 @@ mod test {
));
let res = year.transform(date_array).unwrap();
let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
- assert_eq!(res.len(), 4);
+ assert_eq!(res.len(), 5);
assert_eq!(res.value(0), 0);
assert_eq!(res.value(1), 30);
assert_eq!(res.value(2), 60);
assert_eq!(res.value(3), 90);
+ assert_eq!(res.value(4), -1);
+ }
+
+ fn test_timestamp_and_tz_transform(
+ time: &str,
+ transform: &BoxedTransformFunction,
+ expect: Datum,
+ ) {
+ let timestamp = Datum::timestamp_micros(
+ NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S.%f")
+ .unwrap()
+ .and_utc()
+ .timestamp_micros(),
+ );
+ let timestamp_tz = Datum::timestamptz_micros(
+ NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S.%f")
+ .unwrap()
+ .and_utc()
+ .timestamp_micros(),
+ );
+ let res = transform.transform_literal(×tamp).unwrap().unwrap();
+ assert_eq!(res, expect);
+ let res = transform.transform_literal(×tamp_tz).unwrap().unwrap();
+ assert_eq!(res, expect);
+ }
+
+ fn test_timestamp_and_tz_transform_using_i64(
+ time: i64,
+ transform: &BoxedTransformFunction,
+ expect: Datum,
+ ) {
+ let timestamp = Datum::timestamp_micros(time);
+ let timestamp_tz = Datum::timestamptz_micros(time);
+ let res = transform.transform_literal(×tamp).unwrap().unwrap();
+ assert_eq!(res, expect);
+ let res = transform.transform_literal(×tamp_tz).unwrap().unwrap();
+ assert_eq!(res, expect);
+ }
+
+ fn test_date(date: i32, transform: &BoxedTransformFunction, expect: Datum)
{
+ let date = Datum::date(date);
+ let res = transform.transform_literal(&date).unwrap().unwrap();
+ assert_eq!(res, expect);
+ }
+
+ #[test]
+ fn test_transform_year_literal() {
+ let year = Box::new(super::Year) as BoxedTransformFunction;
+
+ // Test Date32
+ test_date(18628, &year, Datum::int(2021 - super::UNIX_EPOCH_YEAR));
+ test_date(-365, &year, Datum::int(-1));
+
+ // Test TimestampMicrosecond
+ test_timestamp_and_tz_transform_using_i64(
+ 186280000000,
+ &year,
+ Datum::int(1970 - super::UNIX_EPOCH_YEAR),
+ );
+ test_timestamp_and_tz_transform("1969-01-01 00:00:00.00", &year,
Datum::int(-1));
}
#[test]
@@ -224,6 +413,7 @@ mod test {
NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(),
NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(),
NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(),
+ NaiveDate::from_ymd_opt(1969, 12, 1).unwrap(),
];
let date_array: ArrayRef = Arc::new(Date32Array::from(
ori_date
@@ -236,11 +426,12 @@ mod test {
));
let res = month.transform(date_array).unwrap();
let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
- assert_eq!(res.len(), 4);
+ assert_eq!(res.len(), 5);
assert_eq!(res.value(0), 0);
assert_eq!(res.value(1), 30 * 12 + 3);
assert_eq!(res.value(2), 60 * 12 + 6);
assert_eq!(res.value(3), 90 * 12 + 9);
+ assert_eq!(res.value(4), -1);
// Test TimestampMicrosecond
let ori_timestamp = vec![
@@ -252,6 +443,8 @@ mod test {
.unwrap(),
NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
.unwrap(),
+ NaiveDateTime::parse_from_str("1969-12-01 00:00:00.00", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
];
let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
ori_timestamp
@@ -272,11 +465,36 @@ mod test {
));
let res = month.transform(date_array).unwrap();
let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
- assert_eq!(res.len(), 4);
+ assert_eq!(res.len(), 5);
assert_eq!(res.value(0), 0);
assert_eq!(res.value(1), 30 * 12 + 3);
assert_eq!(res.value(2), 60 * 12 + 6);
assert_eq!(res.value(3), 90 * 12 + 9);
+ assert_eq!(res.value(4), -1);
+ }
+
+ #[test]
+ fn test_transform_month_literal() {
+ let month = Box::new(super::Month) as BoxedTransformFunction;
+
+ // Test Date32
+ test_date(
+ 18628,
+ &month,
+ Datum::int((2021 - super::UNIX_EPOCH_YEAR) * 12),
+ );
+ test_date(-31, &month, Datum::int(-1));
+
+ // Test TimestampMicrosecond
+ test_timestamp_and_tz_transform_using_i64(
+ 186280000000,
+ &month,
+ Datum::int((1970 - super::UNIX_EPOCH_YEAR) * 12),
+ );
+ test_timestamp_and_tz_transform("1969-12-01 23:00:00.00", &month,
Datum::int(-1));
+ test_timestamp_and_tz_transform("2017-12-01 00:00:00.00", &month,
Datum::int(575));
+ test_timestamp_and_tz_transform("1970-01-01 00:00:00.00", &month,
Datum::int(0));
+ test_timestamp_and_tz_transform("1969-12-31 00:00:00.00", &month,
Datum::int(-1));
}
#[test]
@@ -287,6 +505,7 @@ mod test {
NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(),
NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(),
NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(),
+ NaiveDate::from_ymd_opt(1969, 12, 31).unwrap(),
];
let expect_day = ori_date
.clone()
@@ -309,11 +528,12 @@ mod test {
));
let res = day.transform(date_array).unwrap();
let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
- assert_eq!(res.len(), 4);
+ assert_eq!(res.len(), 5);
assert_eq!(res.value(0), expect_day[0]);
assert_eq!(res.value(1), expect_day[1]);
assert_eq!(res.value(2), expect_day[2]);
assert_eq!(res.value(3), expect_day[3]);
+ assert_eq!(res.value(4), -1);
// Test TimestampMicrosecond
let ori_timestamp = vec![
@@ -325,6 +545,8 @@ mod test {
.unwrap(),
NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
.unwrap(),
+ NaiveDateTime::parse_from_str("1969-12-31 00:00:00.00", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
];
let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
ori_timestamp
@@ -345,11 +567,25 @@ mod test {
));
let res = day.transform(date_array).unwrap();
let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
- assert_eq!(res.len(), 4);
+ assert_eq!(res.len(), 5);
assert_eq!(res.value(0), expect_day[0]);
assert_eq!(res.value(1), expect_day[1]);
assert_eq!(res.value(2), expect_day[2]);
assert_eq!(res.value(3), expect_day[3]);
+ assert_eq!(res.value(4), -1);
+ }
+
+ #[test]
+ fn test_transform_days_literal() {
+ let day = Box::new(super::Day) as BoxedTransformFunction;
+ // Test Date32
+ test_date(18628, &day, Datum::int(18628));
+ test_date(-31, &day, Datum::int(-31));
+
+ // Test TimestampMicrosecond
+ test_timestamp_and_tz_transform_using_i64(1512151975038194, &day,
Datum::int(17501));
+ test_timestamp_and_tz_transform_using_i64(-115200000000, &day,
Datum::int(-1));
+ test_timestamp_and_tz_transform("2017-12-01 10:30:42.123", &day,
Datum::int(17501));
}
#[test]
@@ -364,6 +600,8 @@ mod test {
.unwrap(),
NaiveDateTime::parse_from_str("2060-09-01 05:03:23.123", "%Y-%m-%d
%H:%M:%S.%f")
.unwrap(),
+ NaiveDateTime::parse_from_str("1969-12-31 23:00:00.00", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
];
let expect_hour = ori_timestamp
.clone()
@@ -401,10 +639,20 @@ mod test {
));
let res = hour.transform(date_array).unwrap();
let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
- assert_eq!(res.len(), 4);
+ assert_eq!(res.len(), 5);
assert_eq!(res.value(0), expect_hour[0]);
assert_eq!(res.value(1), expect_hour[1]);
assert_eq!(res.value(2), expect_hour[2]);
assert_eq!(res.value(3), expect_hour[3]);
+ assert_eq!(res.value(4), -1);
+ }
+
+ #[test]
+ fn test_transform_hours_literal() {
+ let hour = Box::new(super::Hour) as BoxedTransformFunction;
+
+ // Test TimestampMicrosecond
+ test_timestamp_and_tz_transform("2017-12-01 18:00:00.00", &hour,
Datum::int(420042));
+ test_timestamp_and_tz_transform("1969-12-31 23:00:00.00", &hour,
Datum::int(-1));
}
}
diff --git a/crates/iceberg/src/transform/truncate.rs
b/crates/iceberg/src/transform/truncate.rs
index a8ebda8..767ca00 100644
--- a/crates/iceberg/src/transform/truncate.rs
+++ b/crates/iceberg/src/transform/truncate.rs
@@ -20,7 +20,10 @@ use std::sync::Arc;
use arrow_array::ArrayRef;
use arrow_schema::DataType;
-use crate::Error;
+use crate::{
+ spec::{Datum, PrimitiveLiteral},
+ Error,
+};
use super::TransformFunction;
@@ -34,12 +37,28 @@ impl Truncate {
Self { width }
}
- fn truncate_str_by_char(s: &str, max_chars: usize) -> &str {
- match s.char_indices().nth(max_chars) {
+ #[inline]
+ fn truncate_str(s: &str, width: usize) -> &str {
+ match s.char_indices().nth(width) {
None => s,
Some((idx, _)) => &s[..idx],
}
}
+
+ #[inline]
+ fn truncate_i32(v: i32, width: i32) -> i32 {
+ v - v.rem_euclid(width)
+ }
+
+ #[inline]
+ fn truncate_i64(v: i64, width: i64) -> i64 {
+ v - (((v % width) + width) % width)
+ }
+
+ #[inline]
+ fn truncate_decimal_i128(v: i128, width: i128) -> i128 {
+ v - (((v % width) + width) % width)
+ }
}
impl TransformFunction for Truncate {
@@ -56,7 +75,7 @@ impl TransformFunction for Truncate {
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap()
- .unary(|v| v - v.rem_euclid(width));
+ .unary(|v| Self::truncate_i32(v, width));
Ok(Arc::new(res))
}
DataType::Int64 => {
@@ -65,7 +84,7 @@ impl TransformFunction for Truncate {
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.unwrap()
- .unary(|v| v - (((v % width) + width) % width));
+ .unary(|v| Self::truncate_i64(v, width));
Ok(Arc::new(res))
}
DataType::Decimal128(precision, scale) => {
@@ -74,7 +93,7 @@ impl TransformFunction for Truncate {
.as_any()
.downcast_ref::<arrow_array::Decimal128Array>()
.unwrap()
- .unary(|v| v - (((v % width) + width) % width))
+ .unary(|v| Self::truncate_decimal_i128(v, width))
.with_precision_and_scale(*precision, *scale)
.map_err(|err| Error::new(crate::ErrorKind::Unexpected,
format!("{err}")))?;
Ok(Arc::new(res))
@@ -87,7 +106,7 @@ impl TransformFunction for Truncate {
.downcast_ref::<arrow_array::StringArray>()
.unwrap()
.iter()
- .map(|v| v.map(|v| Self::truncate_str_by_char(v,
len))),
+ .map(|v| v.map(|v| Self::truncate_str(v, len))),
);
Ok(Arc::new(res))
}
@@ -99,11 +118,50 @@ impl TransformFunction for Truncate {
.downcast_ref::<arrow_array::LargeStringArray>()
.unwrap()
.iter()
- .map(|v| v.map(|v| Self::truncate_str_by_char(v,
len))),
+ .map(|v| v.map(|v| Self::truncate_str(v, len))),
);
Ok(Arc::new(res))
}
- _ => unreachable!("Truncate transform only supports
(int,long,decimal,string) types"),
+ _ => Err(crate::Error::new(
+ crate::ErrorKind::FeatureUnsupported,
+ format!(
+ "Unsupported data type for truncate transform: {:?}",
+ input.data_type()
+ ),
+ )),
+ }
+ }
+
+ fn transform_literal(&self, input: &Datum) -> crate::Result<Option<Datum>>
{
+ match input.literal() {
+ PrimitiveLiteral::Int(v) => Ok(Some({
+ let width: i32 = self.width.try_into().map_err(|_| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "width is failed to convert to i32 when truncate
Int32Array",
+ )
+ })?;
+ Datum::int(Self::truncate_i32(*v, width))
+ })),
+ PrimitiveLiteral::Long(v) => Ok(Some({
+ let width = self.width as i64;
+ Datum::long(Self::truncate_i64(*v, width))
+ })),
+ PrimitiveLiteral::Decimal(v) => Ok(Some({
+ let width = self.width as i128;
+ Datum::decimal(Self::truncate_decimal_i128(*v, width))?
+ })),
+ PrimitiveLiteral::String(v) => Ok(Some({
+ let len = self.width as usize;
+ Datum::string(Self::truncate_str(v, len).to_string())
+ })),
+ _ => Err(crate::Error::new(
+ crate::ErrorKind::FeatureUnsupported,
+ format!(
+ "Unsupported data type for truncate transform: {:?}",
+ input.data_type()
+ ),
+ )),
}
}
}
@@ -116,7 +174,7 @@ mod test {
builder::PrimitiveBuilder, types::Decimal128Type, Decimal128Array,
Int32Array, Int64Array,
};
- use crate::transform::TransformFunction;
+ use crate::{spec::Datum, transform::TransformFunction};
// Test case ref from:
https://iceberg.apache.org/spec/#truncate-transform-details
#[test]
@@ -187,32 +245,74 @@ mod test {
fn test_string_truncate() {
let test1 = "イロハニホヘト";
let test1_2_expected = "イロ";
- assert_eq!(
- super::Truncate::truncate_str_by_char(test1, 2),
- test1_2_expected
- );
+ assert_eq!(super::Truncate::truncate_str(test1, 2), test1_2_expected);
let test1_3_expected = "イロハ";
- assert_eq!(
- super::Truncate::truncate_str_by_char(test1, 3),
- test1_3_expected
- );
+ assert_eq!(super::Truncate::truncate_str(test1, 3), test1_3_expected);
let test2 = "щщаεはчωいにπάほхεろへσκζ";
let test2_7_expected = "щщаεはчω";
- assert_eq!(
- super::Truncate::truncate_str_by_char(test2, 7),
- test2_7_expected
- );
+ assert_eq!(super::Truncate::truncate_str(test2, 7), test2_7_expected);
let test3 = "\u{FFFF}\u{FFFF}";
- assert_eq!(super::Truncate::truncate_str_by_char(test3, 2), test3);
+ assert_eq!(super::Truncate::truncate_str(test3, 2), test3);
let test4 = "\u{10000}\u{10000}";
let test4_1_expected = "\u{10000}";
- assert_eq!(
- super::Truncate::truncate_str_by_char(test4, 1),
- test4_1_expected
- );
+ assert_eq!(super::Truncate::truncate_str(test4, 1), test4_1_expected);
+ }
+
+ #[test]
+ fn test_literal_int() {
+ let input = Datum::int(1);
+ let res = super::Truncate::new(10)
+ .transform_literal(&input)
+ .unwrap()
+ .unwrap();
+ assert_eq!(res, Datum::int(0),);
+
+ let input = Datum::int(-1);
+ let res = super::Truncate::new(10)
+ .transform_literal(&input)
+ .unwrap()
+ .unwrap();
+ assert_eq!(res, Datum::int(-10),);
+ }
+
+ #[test]
+ fn test_literal_long() {
+ let input = Datum::long(1);
+ let res = super::Truncate::new(10)
+ .transform_literal(&input)
+ .unwrap()
+ .unwrap();
+ assert_eq!(res, Datum::long(0),);
+
+ let input = Datum::long(-1);
+ let res = super::Truncate::new(10)
+ .transform_literal(&input)
+ .unwrap()
+ .unwrap();
+ assert_eq!(res, Datum::long(-10),);
+ }
+
+ #[test]
+ fn test_decimal_literal() {
+ let input = Datum::decimal(1065).unwrap();
+ let res = super::Truncate::new(50)
+ .transform_literal(&input)
+ .unwrap()
+ .unwrap();
+ assert_eq!(res, Datum::decimal(1050).unwrap(),);
+ }
+
+ #[test]
+ fn test_string_literal() {
+ let input = Datum::string("iceberg".to_string());
+ let res = super::Truncate::new(3)
+ .transform_literal(&input)
+ .unwrap()
+ .unwrap();
+ assert_eq!(res, Datum::string("ice".to_string()),);
}
}
diff --git a/crates/iceberg/src/transform/void.rs
b/crates/iceberg/src/transform/void.rs
index d419430..7cbee27 100644
--- a/crates/iceberg/src/transform/void.rs
+++ b/crates/iceberg/src/transform/void.rs
@@ -27,4 +27,8 @@ impl TransformFunction for Void {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
Ok(new_null_array(input.data_type(), input.len()))
}
+
+ fn transform_literal(&self, _input: &crate::spec::Datum) ->
Result<Option<crate::spec::Datum>> {
+ Ok(None)
+ }
}