This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new 3c0e332a8 AVRO-3853: [Rust] Support Local timestamp logical types for
Rust SDK (#2491)
3c0e332a8 is described below
commit 3c0e332a860f63a3dbb88c9fdd2f090cd865bc53
Author: Kousuke Saruta <[email protected]>
AuthorDate: Tue Sep 12 05:35:05 2023 +0900
AVRO-3853: [Rust] Support Local timestamp logical types for Rust SDK (#2491)
* AVRO-3853: [Rust] Support Local timestamp logical types for Rust SDK
(cherry picked from commit 9ec835d62568adea1d8a74bc13bd82b402753882)
---
lang/rust/avro/README.md | 13 +++++++++
lang/rust/avro/src/de.rs | 50 +++++++++++++++++++++++++++++++++--
lang/rust/avro/src/decode.rs | 2 ++
lang/rust/avro/src/encode.rs | 2 ++
lang/rust/avro/src/error.rs | 6 +++++
lang/rust/avro/src/lib.rs | 13 +++++++++
lang/rust/avro/src/schema.rs | 38 ++++++++++++++++++++++++++
lang/rust/avro/src/types.rs | 60 ++++++++++++++++++++++++++++++++++++++++++
lang/rust/avro/tests/schema.rs | 38 ++++++++++++++++++++++++++
9 files changed, 220 insertions(+), 2 deletions(-)
diff --git a/lang/rust/avro/README.md b/lang/rust/avro/README.md
index 4155785a2..ad5ec7068 100644
--- a/lang/rust/avro/README.md
+++ b/lang/rust/avro/README.md
@@ -432,6 +432,7 @@ fn main() -> Result<(), Error> {
1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate
1. Date, Time (milli) as `i32` and Time (micro) as `i64`
1. Timestamp (milli and micro) as `i64`
+1. Local timestamp (milli and micro) as `i64`
1. Duration as a custom type with `months`, `days` and `millis` accessor
methods each of which returns an `i32`
Note that the on-disk representation is identical to the underlying
primitive/complex type.
@@ -499,6 +500,16 @@ fn main() -> Result<(), Error> {
"type": "long",
"logicalType": "timestamp-micros"
},
+ {
+ "name": "local_timestamp_millis",
+ "type": "long",
+ "logicalType": "local-timestamp-millis"
+ },
+ {
+ "name": "local_timestamp_micros",
+ "type": "long",
+ "logicalType": "local-timestamp-micros"
+ },
{
"name": "duration",
"type": {
@@ -527,6 +538,8 @@ fn main() -> Result<(), Error> {
record.put("time_micros", Value::TimeMicros(3));
record.put("timestamp_millis", Value::TimestampMillis(4));
record.put("timestamp_micros", Value::TimestampMicros(5));
+ record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
+ record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
record.put("duration", Duration::new(Months::new(6), Days::new(7),
Millis::new(8)));
writer.append(record)?;
diff --git a/lang/rust/avro/src/de.rs b/lang/rust/avro/src/de.rs
index 601a90961..660056448 100644
--- a/lang/rust/avro/src/de.rs
+++ b/lang/rust/avro/src/de.rs
@@ -244,7 +244,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a
Deserializer<'de> {
Value::Long(i)
| Value::TimeMicros(i)
| Value::TimestampMillis(i)
- | Value::TimestampMicros(i) => visitor.visit_i64(*i),
+ | Value::TimestampMicros(i)
+ | Value::LocalTimestampMillis(i)
+ | Value::LocalTimestampMicros(i) => visitor.visit_i64(*i),
&Value::Float(f) => visitor.visit_f32(f),
&Value::Double(d) => visitor.visit_f64(d),
Value::Union(_i, u) => match **u {
@@ -254,7 +256,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a
Deserializer<'de> {
Value::Long(i)
| Value::TimeMicros(i)
| Value::TimestampMillis(i)
- | Value::TimestampMicros(i) => visitor.visit_i64(i),
+ | Value::TimestampMicros(i)
+ | Value::LocalTimestampMillis(i)
+ | Value::LocalTimestampMicros(i) => visitor.visit_i64(i),
Value::Float(f) => visitor.visit_f32(f),
Value::Double(d) => visitor.visit_f64(d),
Value::Record(ref fields) =>
visitor.visit_map(RecordDeserializer::new(fields)),
@@ -1073,6 +1077,24 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_avro_3853_local_timestamp_millis() -> TestResult {
+ let raw_value = 1;
+ let value = Value::LocalTimestampMillis(raw_value);
+ let result = crate::from_value::<i64>(&value)?;
+ assert_eq!(result, raw_value);
+ Ok(())
+ }
+
+ #[test]
+ fn test_avro_3853_local_timestamp_micros() -> TestResult {
+ let raw_value = 1;
+ let value = Value::LocalTimestampMicros(raw_value);
+ let result = crate::from_value::<i64>(&value)?;
+ assert_eq!(result, raw_value);
+ Ok(())
+ }
+
#[test]
fn test_from_value_uuid_str() -> TestResult {
let raw_value = "9ec535ff-3e2a-45bd-91d3-0a01321b5a49";
@@ -1116,6 +1138,8 @@ mod tests {
("time_micros_a".to_string(), 123),
("timestamp_millis_b".to_string(), 234),
("timestamp_micros_c".to_string(), 345),
+ ("local_timestamp_millis_d".to_string(), 678),
+ ("local_timestamp_micros_e".to_string(), 789),
]
.iter()
.cloned()
@@ -1132,6 +1156,12 @@ mod tests {
key if key.starts_with("timestamp_micros_") => {
(k.clone(), Value::TimestampMicros(*v))
}
+ key if key.starts_with("local_timestamp_millis_") => {
+ (k.clone(), Value::LocalTimestampMillis(*v))
+ }
+ key if key.starts_with("local_timestamp_micros_") => {
+ (k.clone(), Value::LocalTimestampMicros(*v))
+ }
_ => unreachable!("unexpected key: {:?}", k),
})
.collect();
@@ -1181,6 +1211,22 @@ mod tests {
"a_non_existing_timestamp_micros".to_string(),
Value::Union(0, Box::new(Value::TimestampMicros(-345))),
),
+ (
+ "a_local_timestamp_millis".to_string(),
+ Value::Union(0, Box::new(Value::LocalTimestampMillis(678))),
+ ),
+ (
+ "a_non_existing_local_timestamp_millis".to_string(),
+ Value::Union(0, Box::new(Value::LocalTimestampMillis(-678))),
+ ),
+ (
+ "a_local_timestamp_micros".to_string(),
+ Value::Union(0, Box::new(Value::LocalTimestampMicros(789))),
+ ),
+ (
+ "a_non_existing_local_timestamp_micros".to_string(),
+ Value::Union(0, Box::new(Value::LocalTimestampMicros(-789))),
+ ),
(
"a_record".to_string(),
Value::Union(
diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs
index debb38076..b13c76739 100644
--- a/lang/rust/avro/src/decode.rs
+++ b/lang/rust/avro/src/decode.rs
@@ -130,6 +130,8 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
+ Schema::LocalTimestampMillis =>
zag_i64(reader).map(Value::LocalTimestampMillis),
+ Schema::LocalTimestampMicros =>
zag_i64(reader).map(Value::LocalTimestampMicros),
Schema::Duration => {
let mut buf = [0u8; 12];
reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index e8080f04b..6e52e0c3b 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -77,6 +77,8 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
Value::Long(i)
| Value::TimestampMillis(i)
| Value::TimestampMicros(i)
+ | Value::LocalTimestampMillis(i)
+ | Value::LocalTimestampMicros(i)
| Value::TimeMicros(i) => encode_long(*i, buffer),
Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index 447d2711a..bf066b8a5 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -151,6 +151,12 @@ pub enum Error {
#[error("TimestampMicros expected, got {0:?}")]
GetTimestampMicros(ValueKind),
+ #[error("LocalTimestampMillis expected, got {0:?}")]
+ GetLocalTimestampMillis(ValueKind),
+
+ #[error("LocalTimestampMicros expected, got {0:?}")]
+ GetLocalTimestampMicros(ValueKind),
+
#[error("Null expected, got {0:?}")]
GetNull(ValueKind),
diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index cde0251c5..35b1b431a 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -545,6 +545,7 @@
//! 1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate
//! 1. Date, Time (milli) as `i32` and Time (micro) as `i64`
//! 1. Timestamp (milli and micro) as `i64`
+//! 1. Local timestamp (milli and micro) as `i64`
//! 1. Duration as a custom type with `months`, `days` and `millis` accessor
methods each of which returns an `i32`
//!
//! Note that the on-disk representation is identical to the underlying
primitive/complex type.
@@ -613,6 +614,16 @@
//! "logicalType": "timestamp-micros"
//! },
//! {
+//! "name": "local_timestamp_millis",
+//! "type": "long",
+//! "logicalType": "local-timestamp-millis"
+//! },
+//! {
+//! "name": "local_timestamp_micros",
+//! "type": "long",
+//! "logicalType": "local-timestamp-micros"
+//! },
+//! {
//! "name": "duration",
//! "type": {
//! "type": "fixed",
@@ -640,6 +651,8 @@
//! record.put("time_micros", Value::TimeMicros(3));
//! record.put("timestamp_millis", Value::TimestampMillis(4));
//! record.put("timestamp_micros", Value::TimestampMicros(5));
+//! record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
+//! record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
//! record.put("duration", Duration::new(Months::new(6), Days::new(7),
Millis::new(8)));
//!
//! writer.append(record)?;
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 8d99b1201..5a05885f9 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -127,6 +127,10 @@ pub enum Schema {
TimestampMillis,
/// An instant in time represented as the number of microseconds after the
UNIX epoch.
TimestampMicros,
+ /// An instant in localtime represented as the number of milliseconds
after the UNIX epoch.
+ LocalTimestampMillis,
+ /// An instant in local time represented as the number of microseconds
after the UNIX epoch.
+ LocalTimestampMicros,
/// An amount of time defined by a number of months, days and milliseconds.
Duration,
/// A reference to another schema.
@@ -191,6 +195,8 @@ impl From<&types::Value> for SchemaKind {
Value::TimeMicros(_) => Self::TimeMicros,
Value::TimestampMillis(_) => Self::TimestampMillis,
Value::TimestampMicros(_) => Self::TimestampMicros,
+ Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis,
+ Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros,
Value::Duration { .. } => Self::Duration,
}
}
@@ -1388,6 +1394,26 @@ impl Parser {
enclosing_namespace,
);
}
+ "local-timestamp-millis" => {
+ return try_logical_type(
+ "local-timestamp-millis",
+ complex,
+ &[SchemaKind::Long],
+ Schema::LocalTimestampMillis,
+ self,
+ enclosing_namespace,
+ );
+ }
+ "local-timestamp-micros" => {
+ return try_logical_type(
+ "local-timestamp-micros",
+ complex,
+ &[SchemaKind::Long],
+ Schema::LocalTimestampMicros,
+ self,
+ enclosing_namespace,
+ );
+ }
"duration" => {
logical_verify_type(complex, &[SchemaKind::Fixed], self,
enclosing_namespace)?;
return Ok(Schema::Duration);
@@ -1901,6 +1927,18 @@ impl Serialize for Schema {
map.serialize_entry("logicalType", "timestamp-micros")?;
map.end()
}
+ Schema::LocalTimestampMillis => {
+ let mut map = serializer.serialize_map(None)?;
+ map.serialize_entry("type", "long")?;
+ map.serialize_entry("logicalType", "local-timestamp-millis")?;
+ map.end()
+ }
+ Schema::LocalTimestampMicros => {
+ let mut map = serializer.serialize_map(None)?;
+ map.serialize_entry("type", "long")?;
+ map.serialize_entry("logicalType", "local-timestamp-micros")?;
+ map.end()
+ }
Schema::Duration => {
let mut map = serializer.serialize_map(None)?;
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 11bf63564..9bb607705 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -108,6 +108,10 @@ pub enum Value {
TimestampMillis(i64),
/// Timestamp in microseconds.
TimestampMicros(i64),
+ /// Local timestamp in milliseconds.
+ LocalTimestampMillis(i64),
+ /// Local timestamp in microseconds.
+ LocalTimestampMicros(i64),
/// Avro Duration. An amount of time defined by months, days and
milliseconds.
Duration(Duration),
/// Universally unique identifier.
@@ -327,6 +331,8 @@ impl TryFrom<Value> for JsonValue {
Value::TimeMicros(t) => Ok(Self::Number(t.into())),
Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
+ Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
+ Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
Value::Duration(d) => Ok(Self::Array(
<[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
)),
@@ -409,8 +415,12 @@ impl Value {
(&Value::Long(_), &Schema::TimeMicros) => None,
(&Value::Long(_), &Schema::TimestampMillis) => None,
(&Value::Long(_), &Schema::TimestampMicros) => None,
+ (&Value::Long(_), &Schema::LocalTimestampMillis) => None,
+ (&Value::Long(_), &Schema::LocalTimestampMicros) => None,
(&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
(&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
+ (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros)
=> None,
+ (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis)
=> None,
(&Value::TimeMicros(_), &Schema::TimeMicros) => None,
(&Value::TimeMillis(_), &Schema::TimeMillis) => None,
(&Value::Date(_), &Schema::Date) => None,
@@ -669,6 +679,8 @@ impl Value {
Schema::TimeMicros => self.resolve_time_micros(),
Schema::TimestampMillis => self.resolve_timestamp_millis(),
Schema::TimestampMicros => self.resolve_timestamp_micros(),
+ Schema::LocalTimestampMillis =>
self.resolve_local_timestamp_millis(),
+ Schema::LocalTimestampMicros =>
self.resolve_local_timestamp_micros(),
Schema::Duration => self.resolve_duration(),
Schema::Uuid => self.resolve_uuid(),
}
@@ -784,6 +796,26 @@ impl Value {
}
}
+ fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
+ match self {
+ Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
+ Ok(Value::LocalTimestampMillis(ts))
+ }
+ Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))),
+ other => Err(Error::GetLocalTimestampMillis(other.into())),
+ }
+ }
+
+ fn resolve_local_timestamp_micros(self) -> Result<Self, Error> {
+ match self {
+ Value::LocalTimestampMicros(ts) | Value::Long(ts) => {
+ Ok(Value::LocalTimestampMicros(ts))
+ }
+ Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))),
+ other => Err(Error::GetLocalTimestampMicros(other.into())),
+ }
+ }
+
fn resolve_null(self) -> Result<Self, Error> {
match self {
Value::Null => Ok(Value::Null),
@@ -1681,6 +1713,26 @@ Field with name '"b"' is not a member of the map items"#,
assert!(value.resolve(&Schema::TimestampMicros).is_err());
}
+ #[test]
+ fn test_avro_3853_resolve_timestamp_millis() {
+ let value = Value::LocalTimestampMillis(10);
+ assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok());
+ assert!(value.resolve(&Schema::Float).is_err());
+
+ let value = Value::Float(10.0f32);
+ assert!(value.resolve(&Schema::LocalTimestampMillis).is_err());
+ }
+
+ #[test]
+ fn test_avro_3853_resolve_timestamp_micros() {
+ let value = Value::LocalTimestampMicros(10);
+ assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok());
+ assert!(value.resolve(&Schema::Int).is_err());
+
+ let value = Value::Double(10.0);
+ assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
+ }
+
#[test]
fn resolve_duration() {
let value = Value::Duration(Duration::new(
@@ -1886,6 +1938,14 @@ Field with name '"b"' is not a member of the map items"#,
JsonValue::try_from(Value::TimestampMicros(1))?,
JsonValue::Number(1.into())
);
+ assert_eq!(
+ JsonValue::try_from(Value::LocalTimestampMillis(1))?,
+ JsonValue::Number(1.into())
+ );
+ assert_eq!(
+ JsonValue::try_from(Value::LocalTimestampMicros(1))?,
+ JsonValue::Number(1.into())
+ );
assert_eq!(
JsonValue::try_from(Value::Duration(
[1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8,
12u8].into()
diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs
index 1d2cb8b4d..63b730560 100644
--- a/lang/rust/avro/tests/schema.rs
+++ b/lang/rust/avro/tests/schema.rs
@@ -592,6 +592,42 @@ const TIMESTAMPMICROS_LOGICAL_TYPE: &[(&str, bool)] = &[
),
];
+const LOCAL_TIMESTAMPMILLIS_LOGICAL_TYPE: &[(&str, bool)] = &[
+ (
+ r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
+ true,
+ ),
+ // this is valid even though its logical type is "local-timestamp-milis"
(missing the second "l"), because
+ // unknown logical types are ignored
+ (
+ r#"{"type": "long", "logicalType": "local-timestamp-milis"}"#,
+ true,
+ ),
+ (
+ // this is still valid because unknown logicalType should be ignored
+ r#"{"type": "int", "logicalType": "local-timestamp-millis"}"#,
+ true,
+ ),
+];
+
+const LOCAL_TIMESTAMPMICROS_LOGICAL_TYPE: &[(&str, bool)] = &[
+ (
+ r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
+ true,
+ ),
+ // this is valid even though its logical type is "local-timestamp-micro"
(missing the last "s"), because
+ // unknown logical types are ignored
+ (
+ r#"{"type": "long", "logicalType": "local-timestamp-micro"}"#,
+ true,
+ ),
+ (
+ // this is still valid because unknown logicalType should be ignored
+ r#"{"type": "int", "logicalType": "local-timestamp-micros"}"#,
+ true,
+ ),
+];
+
lazy_static! {
static ref EXAMPLES: Vec<(&'static str, bool)> = Vec::new()
.iter()
@@ -612,6 +648,8 @@ lazy_static! {
.chain(TIMEMICROS_LOGICAL_TYPE.iter().copied())
.chain(TIMESTAMPMILLIS_LOGICAL_TYPE.iter().copied())
.chain(TIMESTAMPMICROS_LOGICAL_TYPE.iter().copied())
+ .chain(LOCAL_TIMESTAMPMILLIS_LOGICAL_TYPE.iter().copied())
+ .chain(LOCAL_TIMESTAMPMICROS_LOGICAL_TYPE.iter().copied())
.collect();
static ref VALID_EXAMPLES: Vec<(&'static str, bool)> =
EXAMPLES.iter().copied().filter(|s| s.1).collect();