jecsand838 commented on code in PR #8433:
URL: https://github.com/apache/arrow-rs/pull/8433#discussion_r2376947171
##########
arrow-avro/src/codec.rs:
##########
@@ -685,6 +686,8 @@ pub enum Codec {
Interval,
/// Represents Avro union type, maps to Arrow's Union data type
Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
+ /// Represents an Avro long with an `arrowDurationUnit` metadata property.
Maps to Arrow's Duration(TimeUnit) data type.
+ Duration(TimeUnit),
Review Comment:
So I thought about this some more and I think the cleanest way to do this is
to create the following custom logical types which annotate a `long` primitive
type:
1. `arrow-duration-nanos`
2. `arrow-duration-micros`
3. `arrow-duration-millis`
4. `arrow-duration-seconds`
I'd also probably add a new feature flag called `avro_custom_types` or
something along those lines. When the flag is toggled on, then we'd use the
logical types and map to/from `DataType::Duration`. When the flag is off, we'd
simply read/write the value using it's primitive, i.e. the `long`.
```suggestion
#[cfg(feature = "avro_custom_types")]
DurationNano,
#[cfg(feature = "avro_custom_types")]
DurationMilcros,
#[cfg(feature = "avro_custom_types")]
DurationMillis,
#[cfg(feature = "avro_custom_types")]
DurationSeconds,
```
@alamb @scovich I'm curious what you all think about this approach? The
problem stems from Avro's lack of support for an elapsed time type. `Duration`
in Avro is for Calendar time.
##########
arrow-avro/src/reader/record.rs:
##########
@@ -219,6 +220,10 @@ enum Decoder {
Boolean(BooleanBufferBuilder),
Int32(Vec<i32>),
Int64(Vec<i64>),
+ DurationSecond(Vec<i64>),
+ DurationMillisecond(Vec<i64>),
+ DurationMicrosecond(Vec<i64>),
+ DurationNanosecond(Vec<i64>),
Review Comment:
```suggestion
#[cfg(feature = "avro_custom_types")]
DurationSecond(Vec<i64>),
#[cfg(feature = "avro_custom_types")]
DurationMillisecond(Vec<i64>),
#[cfg(feature = "avro_custom_types")]
DurationMicrosecond(Vec<i64>),
#[cfg(feature = "avro_custom_types")]
DurationNanosecond(Vec<i64>),
```
If the feature flag is off, we'd just read it as an `Int64`
##########
arrow-avro/src/codec.rs:
##########
@@ -759,6 +762,7 @@ impl Codec {
)
}
Self::Union(_, fields, mode) => DataType::Union(fields.clone(),
*mode),
+ Self::Duration(time_unit) => DataType::Duration(*time_unit),
Review Comment:
```suggestion
#[cfg(feature = "avro_custom_types")]
Self::DurationNano => DataType::Duration(TimeUnit::Nanosecond),
#[cfg(feature = "avro_custom_types")]
Self:: DurationMilcros =>
DataType::Duration(TimeUnit::Microsecond),
#[cfg(feature = "avro_custom_types")]
Self:: DurationMillis =>
DataType::Duration(TimeUnit::Millisecond),
#[cfg(feature = "avro_custom_types")]
Self:: DurationSeconds => DataType::Duration(TimeUnit::Second),
```
##########
arrow-avro/src/codec.rs:
##########
@@ -685,6 +686,8 @@ pub enum Codec {
Interval,
/// Represents Avro union type, maps to Arrow's Union data type
Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
+ /// Represents an Avro long with an `arrowDurationUnit` metadata property.
Maps to Arrow's Duration(TimeUnit) data type.
+ Duration(TimeUnit),
Review Comment:
This is a tricky one. The more I think about it, the more I realize Avro
doesn't have a good way to measure elapsed time. I almost think we need a
custom logical type.
https://stackoverflow.com/questions/49034266/how-to-define-a-logicaltype-in-avro-java
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -4747,6 +4749,52 @@ mod test {
}
}
+ #[test]
+ fn test_long_with_duration_annotation() {
+ let _avro_schema_json = r#"
+ {
+ "type": "record",
+ "name": "TestEvents",
+ "fields": [
+ {
+ "name": "event_duration",
+ "type": "long",
+ "arrowDurationUnit": "millisecond"
+ }
+ ]
+ }
+ "#;
+
+ let values = DurationMillisecondArray::from(vec![1000i64, 2000, 3000]);
+ let arrow_field = Field::new(
+ "event_duration",
+ DataType::Duration(TimeUnit::Millisecond),
+ false,
+ );
+ let arrow_schema = Arc::new(Schema::new(vec![arrow_field]));
+ let expected = RecordBatch::try_new(
+ arrow_schema.clone(),
+ vec![Arc::new(values.clone()) as ArrayRef],
+ )
+ .unwrap();
+
+ // Write to in-memory OCF using the Avro writer
+ let buffer = Vec::<u8>::new();
+ let mut writer = AvroWriter::new(buffer,
(*arrow_schema).clone()).unwrap();
+ writer.write(&expected).unwrap();
+ writer.finish().unwrap();
+ let bytes = writer.into_inner();
+
+ let mut reader = ReaderBuilder::new()
+ .build(Cursor::new(bytes))
+ .expect("build reader for in-memory OCF");
+ let out_schema = reader.schema();
+ let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
+ let actual = arrow::compute::concat_batches(&out_schema,
&batches).unwrap();
+
+ assert_eq!(actual, expected);
+ }
Review Comment:
In the test file, I'd be sure to include the new custom logical types.
##########
arrow-avro/src/codec.rs:
##########
@@ -1324,8 +1329,26 @@ impl<'a> Maker<'a> {
(None, _) => {}
}
if !t.attributes.additional.is_empty() {
+ let mut is_duration = false;
for (k, v) in &t.attributes.additional {
- field.metadata.insert(k.to_string(), v.to_string());
+ let key = k.to_string();
+ if matches!(field.codec, Codec::Int64) && key ==
"arrowDurationUnit" {
+ let unit = match v.as_str() {
+ Some("second") => TimeUnit::Second,
+ Some("millisecond") => TimeUnit::Millisecond,
+ Some("microsecond") => TimeUnit::Microsecond,
+ Some("nanosecond") => TimeUnit::Nanosecond,
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Unknown arrowDurationUnit value:
{other:?}"
+ )))
+ }
+ };
+ field.codec = Codec::Duration(unit);
+ is_duration = true;
+ } else {
+ field.metadata.insert(key, v.to_string());
+ }
Review Comment:
Now we can move this up into `match (t.attributes.logical_type, &mut
field.codec)` with the other logical types.
```suggestion
field.metadata.insert(k.to_string(), v.to_string());
```
i.e.
```rust
match (t.attributes.logical_type, &mut field.codec) {
(Some("decimal"), c @ Codec::Binary) => {
let (prec, sc, _) =
parse_decimal_attributes(&t.attributes, None, false)?;
*c = Codec::Decimal(prec, Some(sc), None);
}
(Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
(Some("time-millis"), c @ Codec::Int32) => *c =
Codec::TimeMillis,
(Some("time-micros"), c @ Codec::Int64) => *c =
Codec::TimeMicros,
(Some("timestamp-millis"), c @ Codec::Int64) => {
*c = Codec::TimestampMillis(true)
}
(Some("timestamp-micros"), c @ Codec::Int64) => {
*c = Codec::TimestampMicros(true)
}
(Some("local-timestamp-millis"), c @ Codec::Int64) => {
*c = Codec::TimestampMillis(false)
}
(Some("local-timestamp-micros"), c @ Codec::Int64) => {
*c = Codec::TimestampMicros(false)
}
(Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
#[cfg(feature = "avro_custom_types")]
(Some("arrow-duration-nanos"), c @ Codec::Int64) => *c
= Codec::DurationNano,
#[cfg(feature = "avro_custom_types")]
(Some("arrow-duration-micros"), c @ Codec::Int64) => *c
= Codec::DurationMilcros,
#[cfg(feature = "avro_custom_types")]
(Some("arrow-duration-millis"), c @ Codec::Int64) => *c
= Codec::DurationMillis,
#[cfg(feature = "avro_custom_types")]
(Some("larrow-duration-seconds"), c @ Codec::Int64) =>
*c = Codec::DurationSeconds,
(Some(logical), _) => {
// Insert unrecognized logical type into metadata map
field.metadata.insert("logicalType".into(),
logical.into());
}
(None, _) => {}
}
```
##########
arrow-avro/src/reader/record.rs:
##########
@@ -342,6 +347,18 @@ impl Decoder {
(Codec::TimestampMicros(is_utc), _) => {
Self::TimestampMicros(*is_utc,
Vec::with_capacity(DEFAULT_CAPACITY))
}
+ (Codec::Duration(unit), _) => match unit {
+ TimeUnit::Second =>
Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY)),
+ TimeUnit::Millisecond => {
+
Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
+ }
+ TimeUnit::Microsecond => {
+
Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
+ }
+ TimeUnit::Nanosecond => {
+
Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
+ }
+ },
Review Comment:
I'd feature flag this and the other adjacent changes as well imo.
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -270,10 +271,21 @@ impl<'a> FieldEncoder<'a> {
array.as_primitive::<IntervalDayTimeType>(),
)),
}
- DataType::Duration(_) => {
- return Err(ArrowError::NotYetImplemented(
- "Avro writer: Arrow Duration(TimeUnit) has no standard
Avro mapping; cast to Interval(MonthDayNano) to use Avro 'duration'".into(),
- ));
+ DataType::Duration(tu) => {
+ match tu {
+ TimeUnit::Second => Encoder::DurationSec(LongEncoder(
+ array.as_primitive::<DurationSecondType>(),
+ )),
+ TimeUnit::Millisecond =>
Encoder::DurationMs(LongEncoder(
+ array.as_primitive::<DurationMillisecondType>(),
+ )),
+ TimeUnit::Microsecond =>
Encoder::DurationUs(LongEncoder(
+ array.as_primitive::<DurationMicrosecondType>(),
+ )),
+ TimeUnit::Nanosecond =>
Encoder::DurationNs(LongEncoder(
+ array.as_primitive::<DurationNanosecondType>(),
+ )),
+ }
Review Comment:
I'd use the `#[cfg(feature = "avro_custom_types")]` in the encoder as well.
Basically if `avro_custom_types` is on then write an Avro long annotated with
the logicalType metadata. Otherwise if the `avro_custom_types` is off, just
encode DataType::Duration(_) as a regular long/int64.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]