This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 6af2bf675 Support timezones in CSV reader (#3841) (#3908)
6af2bf675 is described below

commit 6af2bf67582b52efdbe05a4902e68ad938dd14b7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Mar 23 21:22:33 2023 +0000

    Support timezones in CSV reader (#3841) (#3908)
---
 arrow-csv/src/reader/mod.rs | 192 ++++++++++++++++++++++++++------------------
 1 file changed, 115 insertions(+), 77 deletions(-)

diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs
index 046bfafc4..262c057d4 100644
--- a/arrow-csv/src/reader/mod.rs
+++ b/arrow-csv/src/reader/mod.rs
@@ -45,8 +45,9 @@ mod records;
 use arrow_array::builder::PrimitiveBuilder;
 use arrow_array::types::*;
 use arrow_array::*;
-use arrow_cast::parse::{parse_decimal, Parser};
+use arrow_cast::parse::{parse_decimal, string_to_datetime, Parser};
 use arrow_schema::*;
+use chrono::{TimeZone, Utc};
 use lazy_static::lazy_static;
 use regex::{Regex, RegexSet};
 use std::fmt;
@@ -56,6 +57,7 @@ use std::sync::Arc;
 
 use crate::map_csv_error;
 use crate::reader::records::{RecordDecoder, StringRecords};
+use arrow_array::timezone::Tz;
 use csv::StringRecord;
 
 lazy_static! {
@@ -677,33 +679,36 @@ fn parse(
                 >(
                     line_number, rows, i, None
                 ),
-                DataType::Timestamp(TimeUnit::Second, _) => 
build_primitive_array::<
-                    TimestampSecondType,
-                >(
-                    line_number, rows, i, None
-                ),
-                DataType::Timestamp(TimeUnit::Millisecond, _) => {
-                    build_primitive_array::<TimestampMillisecondType>(
+                DataType::Timestamp(TimeUnit::Second, tz) => {
+                    build_timestamp_array::<TimestampSecondType>(
                         line_number,
                         rows,
                         i,
-                        None,
+                        tz.as_deref(),
                     )
                 }
-                DataType::Timestamp(TimeUnit::Microsecond, _) => {
-                    build_primitive_array::<TimestampMicrosecondType>(
+                DataType::Timestamp(TimeUnit::Millisecond, tz) => {
+                    build_timestamp_array::<TimestampMillisecondType>(
                         line_number,
                         rows,
                         i,
-                        None,
+                        tz.as_deref(),
                     )
                 }
-                DataType::Timestamp(TimeUnit::Nanosecond, _) => {
-                    build_primitive_array::<TimestampNanosecondType>(
+                DataType::Timestamp(TimeUnit::Microsecond, tz) => {
+                    build_timestamp_array::<TimestampMicrosecondType>(
                         line_number,
                         rows,
                         i,
-                        None,
+                        tz.as_deref(),
+                    )
+                }
+                DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
+                    build_timestamp_array::<TimestampNanosecondType>(
+                        line_number,
+                        rows,
+                        i,
+                        tz.as_deref(),
                     )
                 }
                 DataType::Utf8 => Ok(Arc::new(
@@ -871,6 +876,54 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
         .map(|e| Arc::new(e) as ArrayRef)
 }
 
+fn build_timestamp_array<T: ArrowTimestampType>(
+    line_number: usize,
+    rows: &StringRecords<'_>,
+    col_idx: usize,
+    timezone: Option<&str>,
+) -> Result<ArrayRef, ArrowError> {
+    Ok(Arc::new(match timezone {
+        Some(timezone) => {
+            let tz: Tz = timezone.parse()?;
+            build_timestamp_array_impl::<T, _>(line_number, rows, col_idx, 
&tz)?
+                .with_timezone(timezone)
+        }
+        None => build_timestamp_array_impl::<T, _>(line_number, rows, col_idx, 
&Utc)?,
+    }))
+}
+
+fn build_timestamp_array_impl<T: ArrowTimestampType, Tz: TimeZone>(
+    line_number: usize,
+    rows: &StringRecords<'_>,
+    col_idx: usize,
+    timezone: &Tz,
+) -> Result<PrimitiveArray<T>, ArrowError> {
+    rows.iter()
+        .enumerate()
+        .map(|(row_index, row)| {
+            let s = row.get(col_idx);
+            if s.is_empty() {
+                return Ok(None);
+            }
+
+            let date = string_to_datetime(timezone, s).map_err(|e| {
+                ArrowError::ParseError(format!(
+                    "Error parsing column {col_idx} at line {}: {}",
+                    line_number + row_index,
+                    e
+                ))
+            })?;
+
+            Ok(Some(match T::UNIT {
+                TimeUnit::Second => date.timestamp(),
+                TimeUnit::Millisecond => date.timestamp_millis(),
+                TimeUnit::Microsecond => date.timestamp_micros(),
+                TimeUnit::Nanosecond => date.timestamp_nanos(),
+            }))
+        })
+        .collect()
+}
+
 // parses a specific column (col_idx) into an Arrow Array.
 fn build_boolean_array(
     line_number: usize,
@@ -1147,7 +1200,6 @@ mod tests {
     use tempfile::NamedTempFile;
 
     use arrow_array::cast::AsArray;
-    use chrono::prelude::*;
 
     #[test]
     fn test_csv() {
@@ -1686,75 +1738,61 @@ mod tests {
         );
     }
 
-    #[test]
-    fn test_parse_timestamp_microseconds() {
-        assert_eq!(
-            
parse_item::<TimestampMicrosecondType>("1970-01-01T00:00:00Z").unwrap(),
-            0
-        );
-        let naive_datetime = NaiveDateTime::new(
-            NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
-            NaiveTime::from_hms_nano_opt(17, 11, 10, 0).unwrap(),
-        );
-        assert_eq!(
-            
parse_item::<TimestampMicrosecondType>("2018-11-13T17:11:10").unwrap(),
-            naive_datetime.timestamp_nanos() / 1000
-        );
-        assert_eq!(
-            parse_item::<TimestampMicrosecondType>("2018-11-13 
17:11:10").unwrap(),
-            naive_datetime.timestamp_nanos() / 1000
-        );
-        let naive_datetime = NaiveDateTime::new(
-            NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
-            NaiveTime::from_hms_nano_opt(17, 11, 10, 11000000).unwrap(),
-        );
-        assert_eq!(
-            
parse_item::<TimestampMicrosecondType>("2018-11-13T17:11:10.011").unwrap(),
-            naive_datetime.timestamp_nanos() / 1000
-        );
-        let naive_datetime = NaiveDateTime::new(
-            NaiveDate::from_ymd_opt(1900, 2, 28).unwrap(),
-            NaiveTime::from_hms_nano_opt(12, 34, 56, 0).unwrap(),
-        );
-        assert_eq!(
-            
parse_item::<TimestampMicrosecondType>("1900-02-28T12:34:56").unwrap(),
-            naive_datetime.timestamp_nanos() / 1000
-        );
+    fn test_parse_timestamp_impl<T: ArrowTimestampType>(
+        timezone: Option<String>,
+        expected: &[i64],
+    ) {
+        let csv = [
+            "1970-01-01T00:00:00",
+            "1970-01-01T00:00:00Z",
+            "1970-01-01T00:00:00+02:00",
+        ]
+        .join("\n");
+        let mut decoder = ReaderBuilder::new()
+            .with_schema(Arc::new(Schema::new(vec![Field::new(
+                "field",
+                DataType::Timestamp(T::UNIT, timezone.clone()),
+                true,
+            )])))
+            .build_decoder();
+
+        let decoded = decoder.decode(csv.as_bytes()).unwrap();
+        assert_eq!(decoded, csv.len());
+        decoder.decode(&[]).unwrap();
+
+        let batch = decoder.flush().unwrap().unwrap();
+        assert_eq!(batch.num_columns(), 1);
+        assert_eq!(batch.num_rows(), 3);
+        let col = batch.column(0).as_primitive::<T>();
+        assert_eq!(col.values(), expected);
+        assert_eq!(col.data_type(), &DataType::Timestamp(T::UNIT, timezone));
     }
 
     #[test]
-    fn test_parse_timestamp_nanoseconds() {
-        assert_eq!(
-            
parse_item::<TimestampNanosecondType>("1970-01-01T00:00:00Z").unwrap(),
-            0
-        );
-        let naive_datetime = NaiveDateTime::new(
-            NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
-            NaiveTime::from_hms_nano_opt(17, 11, 10, 0).unwrap(),
-        );
-        assert_eq!(
-            
parse_item::<TimestampNanosecondType>("2018-11-13T17:11:10").unwrap(),
-            naive_datetime.timestamp_nanos()
+    fn test_parse_timestamp() {
+        test_parse_timestamp_impl::<TimestampNanosecondType>(
+            None,
+            &[0, 0, -7_200_000_000_000],
         );
-        assert_eq!(
-            parse_item::<TimestampNanosecondType>("2018-11-13 
17:11:10").unwrap(),
-            naive_datetime.timestamp_nanos()
+        test_parse_timestamp_impl::<TimestampNanosecondType>(
+            Some("+00:00".to_string()),
+            &[0, 0, -7_200_000_000_000],
         );
-        let naive_datetime = NaiveDateTime::new(
-            NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
-            NaiveTime::from_hms_nano_opt(17, 11, 10, 11000000).unwrap(),
+        test_parse_timestamp_impl::<TimestampNanosecondType>(
+            Some("-05:00".to_string()),
+            &[18_000_000_000_000, 0, -7_200_000_000_000],
         );
-        assert_eq!(
-            
parse_item::<TimestampNanosecondType>("2018-11-13T17:11:10.011").unwrap(),
-            naive_datetime.timestamp_nanos()
+        test_parse_timestamp_impl::<TimestampMicrosecondType>(
+            Some("-03".to_string()),
+            &[10_800_000_000, 0, -7_200_000_000],
         );
-        let naive_datetime = NaiveDateTime::new(
-            NaiveDate::from_ymd_opt(1900, 2, 28).unwrap(),
-            NaiveTime::from_hms_nano_opt(12, 34, 56, 0).unwrap(),
+        test_parse_timestamp_impl::<TimestampMillisecondType>(
+            Some("-03".to_string()),
+            &[10_800_000, 0, -7_200_000],
         );
-        assert_eq!(
-            
parse_item::<TimestampNanosecondType>("1900-02-28T12:34:56").unwrap(),
-            naive_datetime.timestamp_nanos()
+        test_parse_timestamp_impl::<TimestampSecondType>(
+            Some("-03".to_string()),
+            &[10_800, 0, -7_200],
         );
     }
 

Reply via email to