This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new 3c0e332a8 AVRO-3853: [Rust] Support Local timestamp logical types for 
Rust SDK (#2491)
3c0e332a8 is described below

commit 3c0e332a860f63a3dbb88c9fdd2f090cd865bc53
Author: Kousuke Saruta <[email protected]>
AuthorDate: Tue Sep 12 05:35:05 2023 +0900

    AVRO-3853: [Rust] Support Local timestamp logical types for Rust SDK (#2491)
    
    * AVRO-3853: [Rust] Support Local timestamp logical types for Rust SDK
    
    (cherry picked from commit 9ec835d62568adea1d8a74bc13bd82b402753882)
---
 lang/rust/avro/README.md       | 13 +++++++++
 lang/rust/avro/src/de.rs       | 50 +++++++++++++++++++++++++++++++++--
 lang/rust/avro/src/decode.rs   |  2 ++
 lang/rust/avro/src/encode.rs   |  2 ++
 lang/rust/avro/src/error.rs    |  6 +++++
 lang/rust/avro/src/lib.rs      | 13 +++++++++
 lang/rust/avro/src/schema.rs   | 38 ++++++++++++++++++++++++++
 lang/rust/avro/src/types.rs    | 60 ++++++++++++++++++++++++++++++++++++++++++
 lang/rust/avro/tests/schema.rs | 38 ++++++++++++++++++++++++++
 9 files changed, 220 insertions(+), 2 deletions(-)

diff --git a/lang/rust/avro/README.md b/lang/rust/avro/README.md
index 4155785a2..ad5ec7068 100644
--- a/lang/rust/avro/README.md
+++ b/lang/rust/avro/README.md
@@ -432,6 +432,7 @@ fn main() -> Result<(), Error> {
 1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate
 1. Date, Time (milli) as `i32` and Time (micro) as `i64`
 1. Timestamp (milli and micro) as `i64`
+1. Local timestamp (milli and micro) as `i64`
 1. Duration as a custom type with `months`, `days` and `millis` accessor 
methods each of which returns an `i32`
 
 Note that the on-disk representation is identical to the underlying 
primitive/complex type.
@@ -499,6 +500,16 @@ fn main() -> Result<(), Error> {
           "type": "long",
           "logicalType": "timestamp-micros"
         },
+        {
+          "name": "local_timestamp_millis",
+          "type": "long",
+          "logicalType": "local-timestamp-millis"
+        },
+        {
+          "name": "local_timestamp_micros",
+          "type": "long",
+          "logicalType": "local-timestamp-micros"
+        },
         {
           "name": "duration",
           "type": {
@@ -527,6 +538,8 @@ fn main() -> Result<(), Error> {
     record.put("time_micros", Value::TimeMicros(3));
     record.put("timestamp_millis", Value::TimestampMillis(4));
     record.put("timestamp_micros", Value::TimestampMicros(5));
+    record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
+    record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
     record.put("duration", Duration::new(Months::new(6), Days::new(7), 
Millis::new(8)));
 
     writer.append(record)?;
diff --git a/lang/rust/avro/src/de.rs b/lang/rust/avro/src/de.rs
index 601a90961..660056448 100644
--- a/lang/rust/avro/src/de.rs
+++ b/lang/rust/avro/src/de.rs
@@ -244,7 +244,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a 
Deserializer<'de> {
             Value::Long(i)
             | Value::TimeMicros(i)
             | Value::TimestampMillis(i)
-            | Value::TimestampMicros(i) => visitor.visit_i64(*i),
+            | Value::TimestampMicros(i)
+            | Value::LocalTimestampMillis(i)
+            | Value::LocalTimestampMicros(i) => visitor.visit_i64(*i),
             &Value::Float(f) => visitor.visit_f32(f),
             &Value::Double(d) => visitor.visit_f64(d),
             Value::Union(_i, u) => match **u {
@@ -254,7 +256,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a 
Deserializer<'de> {
                 Value::Long(i)
                 | Value::TimeMicros(i)
                 | Value::TimestampMillis(i)
-                | Value::TimestampMicros(i) => visitor.visit_i64(i),
+                | Value::TimestampMicros(i)
+                | Value::LocalTimestampMillis(i)
+                | Value::LocalTimestampMicros(i) => visitor.visit_i64(i),
                 Value::Float(f) => visitor.visit_f32(f),
                 Value::Double(d) => visitor.visit_f64(d),
                 Value::Record(ref fields) => 
visitor.visit_map(RecordDeserializer::new(fields)),
@@ -1073,6 +1077,24 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_avro_3853_local_timestamp_millis() -> TestResult {
+        let raw_value = 1;
+        let value = Value::LocalTimestampMillis(raw_value);
+        let result = crate::from_value::<i64>(&value)?;
+        assert_eq!(result, raw_value);
+        Ok(())
+    }
+
+    #[test]
+    fn test_avro_3853_local_timestamp_micros() -> TestResult {
+        let raw_value = 1;
+        let value = Value::LocalTimestampMicros(raw_value);
+        let result = crate::from_value::<i64>(&value)?;
+        assert_eq!(result, raw_value);
+        Ok(())
+    }
+
     #[test]
     fn test_from_value_uuid_str() -> TestResult {
         let raw_value = "9ec535ff-3e2a-45bd-91d3-0a01321b5a49";
@@ -1116,6 +1138,8 @@ mod tests {
             ("time_micros_a".to_string(), 123),
             ("timestamp_millis_b".to_string(), 234),
             ("timestamp_micros_c".to_string(), 345),
+            ("local_timestamp_millis_d".to_string(), 678),
+            ("local_timestamp_micros_e".to_string(), 789),
         ]
         .iter()
         .cloned()
@@ -1132,6 +1156,12 @@ mod tests {
                 key if key.starts_with("timestamp_micros_") => {
                     (k.clone(), Value::TimestampMicros(*v))
                 }
+                key if key.starts_with("local_timestamp_millis_") => {
+                    (k.clone(), Value::LocalTimestampMillis(*v))
+                }
+                key if key.starts_with("local_timestamp_micros_") => {
+                    (k.clone(), Value::LocalTimestampMicros(*v))
+                }
                 _ => unreachable!("unexpected key: {:?}", k),
             })
             .collect();
@@ -1181,6 +1211,22 @@ mod tests {
                 "a_non_existing_timestamp_micros".to_string(),
                 Value::Union(0, Box::new(Value::TimestampMicros(-345))),
             ),
+            (
+                "a_local_timestamp_millis".to_string(),
+                Value::Union(0, Box::new(Value::LocalTimestampMillis(678))),
+            ),
+            (
+                "a_non_existing_local_timestamp_millis".to_string(),
+                Value::Union(0, Box::new(Value::LocalTimestampMillis(-678))),
+            ),
+            (
+                "a_local_timestamp_micros".to_string(),
+                Value::Union(0, Box::new(Value::LocalTimestampMicros(789))),
+            ),
+            (
+                "a_non_existing_local_timestamp_micros".to_string(),
+                Value::Union(0, Box::new(Value::LocalTimestampMicros(-789))),
+            ),
             (
                 "a_record".to_string(),
                 Value::Union(
diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs
index debb38076..b13c76739 100644
--- a/lang/rust/avro/src/decode.rs
+++ b/lang/rust/avro/src/decode.rs
@@ -130,6 +130,8 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
         Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
         Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
         Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
+        Schema::LocalTimestampMillis => 
zag_i64(reader).map(Value::LocalTimestampMillis),
+        Schema::LocalTimestampMicros => 
zag_i64(reader).map(Value::LocalTimestampMicros),
         Schema::Duration => {
             let mut buf = [0u8; 12];
             reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index e8080f04b..6e52e0c3b 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -77,6 +77,8 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
         Value::Long(i)
         | Value::TimestampMillis(i)
         | Value::TimestampMicros(i)
+        | Value::LocalTimestampMillis(i)
+        | Value::LocalTimestampMicros(i)
         | Value::TimeMicros(i) => encode_long(*i, buffer),
         Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
         Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index 447d2711a..bf066b8a5 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -151,6 +151,12 @@ pub enum Error {
     #[error("TimestampMicros expected, got {0:?}")]
     GetTimestampMicros(ValueKind),
 
+    #[error("LocalTimestampMillis expected, got {0:?}")]
+    GetLocalTimestampMillis(ValueKind),
+
+    #[error("LocalTimestampMicros expected, got {0:?}")]
+    GetLocalTimestampMicros(ValueKind),
+
     #[error("Null expected, got {0:?}")]
     GetNull(ValueKind),
 
diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index cde0251c5..35b1b431a 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -545,6 +545,7 @@
 //! 1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate
 //! 1. Date, Time (milli) as `i32` and Time (micro) as `i64`
 //! 1. Timestamp (milli and micro) as `i64`
+//! 1. Local timestamp (milli and micro) as `i64`
 //! 1. Duration as a custom type with `months`, `days` and `millis` accessor 
methods each of which returns an `i32`
 //!
 //! Note that the on-disk representation is identical to the underlying 
primitive/complex type.
@@ -613,6 +614,16 @@
 //!           "logicalType": "timestamp-micros"
 //!         },
 //!         {
+//!           "name": "local_timestamp_millis",
+//!           "type": "long",
+//!           "logicalType": "local-timestamp-millis"
+//!         },
+//!         {
+//!           "name": "local_timestamp_micros",
+//!           "type": "long",
+//!           "logicalType": "local-timestamp-micros"
+//!         },
+//!         {
 //!           "name": "duration",
 //!           "type": {
 //!             "type": "fixed",
@@ -640,6 +651,8 @@
 //!     record.put("time_micros", Value::TimeMicros(3));
 //!     record.put("timestamp_millis", Value::TimestampMillis(4));
 //!     record.put("timestamp_micros", Value::TimestampMicros(5));
+//!     record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
+//!     record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
 //!     record.put("duration", Duration::new(Months::new(6), Days::new(7), 
Millis::new(8)));
 //!
 //!     writer.append(record)?;
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 8d99b1201..5a05885f9 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -127,6 +127,10 @@ pub enum Schema {
     TimestampMillis,
     /// An instant in time represented as the number of microseconds after the 
UNIX epoch.
     TimestampMicros,
+    /// An instant in localtime represented as the number of milliseconds 
after the UNIX epoch.
+    LocalTimestampMillis,
+    /// An instant in local time represented as the number of microseconds 
after the UNIX epoch.
+    LocalTimestampMicros,
     /// An amount of time defined by a number of months, days and milliseconds.
     Duration,
     /// A reference to another schema.
@@ -191,6 +195,8 @@ impl From<&types::Value> for SchemaKind {
             Value::TimeMicros(_) => Self::TimeMicros,
             Value::TimestampMillis(_) => Self::TimestampMillis,
             Value::TimestampMicros(_) => Self::TimestampMicros,
+            Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis,
+            Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros,
             Value::Duration { .. } => Self::Duration,
         }
     }
@@ -1388,6 +1394,26 @@ impl Parser {
                         enclosing_namespace,
                     );
                 }
+                "local-timestamp-millis" => {
+                    return try_logical_type(
+                        "local-timestamp-millis",
+                        complex,
+                        &[SchemaKind::Long],
+                        Schema::LocalTimestampMillis,
+                        self,
+                        enclosing_namespace,
+                    );
+                }
+                "local-timestamp-micros" => {
+                    return try_logical_type(
+                        "local-timestamp-micros",
+                        complex,
+                        &[SchemaKind::Long],
+                        Schema::LocalTimestampMicros,
+                        self,
+                        enclosing_namespace,
+                    );
+                }
                 "duration" => {
                     logical_verify_type(complex, &[SchemaKind::Fixed], self, 
enclosing_namespace)?;
                     return Ok(Schema::Duration);
@@ -1901,6 +1927,18 @@ impl Serialize for Schema {
                 map.serialize_entry("logicalType", "timestamp-micros")?;
                 map.end()
             }
+            Schema::LocalTimestampMillis => {
+                let mut map = serializer.serialize_map(None)?;
+                map.serialize_entry("type", "long")?;
+                map.serialize_entry("logicalType", "local-timestamp-millis")?;
+                map.end()
+            }
+            Schema::LocalTimestampMicros => {
+                let mut map = serializer.serialize_map(None)?;
+                map.serialize_entry("type", "long")?;
+                map.serialize_entry("logicalType", "local-timestamp-micros")?;
+                map.end()
+            }
             Schema::Duration => {
                 let mut map = serializer.serialize_map(None)?;
 
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 11bf63564..9bb607705 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -108,6 +108,10 @@ pub enum Value {
     TimestampMillis(i64),
     /// Timestamp in microseconds.
     TimestampMicros(i64),
+    /// Local timestamp in milliseconds.
+    LocalTimestampMillis(i64),
+    /// Local timestamp in microseconds.
+    LocalTimestampMicros(i64),
     /// Avro Duration. An amount of time defined by months, days and 
milliseconds.
     Duration(Duration),
     /// Universally unique identifier.
@@ -327,6 +331,8 @@ impl TryFrom<Value> for JsonValue {
             Value::TimeMicros(t) => Ok(Self::Number(t.into())),
             Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
             Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
+            Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
+            Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
             Value::Duration(d) => Ok(Self::Array(
                 <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
             )),
@@ -409,8 +415,12 @@ impl Value {
             (&Value::Long(_), &Schema::TimeMicros) => None,
             (&Value::Long(_), &Schema::TimestampMillis) => None,
             (&Value::Long(_), &Schema::TimestampMicros) => None,
+            (&Value::Long(_), &Schema::LocalTimestampMillis) => None,
+            (&Value::Long(_), &Schema::LocalTimestampMicros) => None,
             (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
             (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
+            (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) 
=> None,
+            (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) 
=> None,
             (&Value::TimeMicros(_), &Schema::TimeMicros) => None,
             (&Value::TimeMillis(_), &Schema::TimeMillis) => None,
             (&Value::Date(_), &Schema::Date) => None,
@@ -669,6 +679,8 @@ impl Value {
             Schema::TimeMicros => self.resolve_time_micros(),
             Schema::TimestampMillis => self.resolve_timestamp_millis(),
             Schema::TimestampMicros => self.resolve_timestamp_micros(),
+            Schema::LocalTimestampMillis => 
self.resolve_local_timestamp_millis(),
+            Schema::LocalTimestampMicros => 
self.resolve_local_timestamp_micros(),
             Schema::Duration => self.resolve_duration(),
             Schema::Uuid => self.resolve_uuid(),
         }
@@ -784,6 +796,26 @@ impl Value {
         }
     }
 
+    fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
+        match self {
+            Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
+                Ok(Value::LocalTimestampMillis(ts))
+            }
+            Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))),
+            other => Err(Error::GetLocalTimestampMillis(other.into())),
+        }
+    }
+
+    fn resolve_local_timestamp_micros(self) -> Result<Self, Error> {
+        match self {
+            Value::LocalTimestampMicros(ts) | Value::Long(ts) => {
+                Ok(Value::LocalTimestampMicros(ts))
+            }
+            Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))),
+            other => Err(Error::GetLocalTimestampMicros(other.into())),
+        }
+    }
+
     fn resolve_null(self) -> Result<Self, Error> {
         match self {
             Value::Null => Ok(Value::Null),
@@ -1681,6 +1713,26 @@ Field with name '"b"' is not a member of the map items"#,
         assert!(value.resolve(&Schema::TimestampMicros).is_err());
     }
 
+    #[test]
+    fn test_avro_3853_resolve_timestamp_millis() {
+        let value = Value::LocalTimestampMillis(10);
+        assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok());
+        assert!(value.resolve(&Schema::Float).is_err());
+
+        let value = Value::Float(10.0f32);
+        assert!(value.resolve(&Schema::LocalTimestampMillis).is_err());
+    }
+
+    #[test]
+    fn test_avro_3853_resolve_timestamp_micros() {
+        let value = Value::LocalTimestampMicros(10);
+        assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok());
+        assert!(value.resolve(&Schema::Int).is_err());
+
+        let value = Value::Double(10.0);
+        assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
+    }
+
     #[test]
     fn resolve_duration() {
         let value = Value::Duration(Duration::new(
@@ -1886,6 +1938,14 @@ Field with name '"b"' is not a member of the map items"#,
             JsonValue::try_from(Value::TimestampMicros(1))?,
             JsonValue::Number(1.into())
         );
+        assert_eq!(
+            JsonValue::try_from(Value::LocalTimestampMillis(1))?,
+            JsonValue::Number(1.into())
+        );
+        assert_eq!(
+            JsonValue::try_from(Value::LocalTimestampMicros(1))?,
+            JsonValue::Number(1.into())
+        );
         assert_eq!(
             JsonValue::try_from(Value::Duration(
                 [1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 
12u8].into()
diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs
index 1d2cb8b4d..63b730560 100644
--- a/lang/rust/avro/tests/schema.rs
+++ b/lang/rust/avro/tests/schema.rs
@@ -592,6 +592,42 @@ const TIMESTAMPMICROS_LOGICAL_TYPE: &[(&str, bool)] = &[
     ),
 ];
 
+const LOCAL_TIMESTAMPMILLIS_LOGICAL_TYPE: &[(&str, bool)] = &[
+    (
+        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
+        true,
+    ),
+    // this is valid even though its logical type is "local-timestamp-milis" 
(missing the second "l"), because
+    // unknown logical types are ignored
+    (
+        r#"{"type": "long", "logicalType": "local-timestamp-milis"}"#,
+        true,
+    ),
+    (
+        // this is still valid because unknown logicalType should be ignored
+        r#"{"type": "int", "logicalType": "local-timestamp-millis"}"#,
+        true,
+    ),
+];
+
+const LOCAL_TIMESTAMPMICROS_LOGICAL_TYPE: &[(&str, bool)] = &[
+    (
+        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
+        true,
+    ),
+    // this is valid even though its logical type is "local-timestamp-micro" 
(missing the last "s"), because
+    // unknown logical types are ignored
+    (
+        r#"{"type": "long", "logicalType": "local-timestamp-micro"}"#,
+        true,
+    ),
+    (
+        // this is still valid because unknown logicalType should be ignored
+        r#"{"type": "int", "logicalType": "local-timestamp-micros"}"#,
+        true,
+    ),
+];
+
 lazy_static! {
     static ref EXAMPLES: Vec<(&'static str, bool)> = Vec::new()
         .iter()
@@ -612,6 +648,8 @@ lazy_static! {
         .chain(TIMEMICROS_LOGICAL_TYPE.iter().copied())
         .chain(TIMESTAMPMILLIS_LOGICAL_TYPE.iter().copied())
         .chain(TIMESTAMPMICROS_LOGICAL_TYPE.iter().copied())
+        .chain(LOCAL_TIMESTAMPMILLIS_LOGICAL_TYPE.iter().copied())
+        .chain(LOCAL_TIMESTAMPMICROS_LOGICAL_TYPE.iter().copied())
         .collect();
     static ref VALID_EXAMPLES: Vec<(&'static str, bool)> =
         EXAMPLES.iter().copied().filter(|s| s.1).collect();

Reply via email to