This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 6af2bf675 Support timezones in CSV reader (#3841) (#3908)
6af2bf675 is described below
commit 6af2bf67582b52efdbe05a4902e68ad938dd14b7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Mar 23 21:22:33 2023 +0000
Support timezones in CSV reader (#3841) (#3908)
---
arrow-csv/src/reader/mod.rs | 192 ++++++++++++++++++++++++++------------------
1 file changed, 115 insertions(+), 77 deletions(-)
diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs
index 046bfafc4..262c057d4 100644
--- a/arrow-csv/src/reader/mod.rs
+++ b/arrow-csv/src/reader/mod.rs
@@ -45,8 +45,9 @@ mod records;
use arrow_array::builder::PrimitiveBuilder;
use arrow_array::types::*;
use arrow_array::*;
-use arrow_cast::parse::{parse_decimal, Parser};
+use arrow_cast::parse::{parse_decimal, string_to_datetime, Parser};
use arrow_schema::*;
+use chrono::{TimeZone, Utc};
use lazy_static::lazy_static;
use regex::{Regex, RegexSet};
use std::fmt;
@@ -56,6 +57,7 @@ use std::sync::Arc;
use crate::map_csv_error;
use crate::reader::records::{RecordDecoder, StringRecords};
+use arrow_array::timezone::Tz;
use csv::StringRecord;
lazy_static! {
@@ -677,33 +679,36 @@ fn parse(
>(
line_number, rows, i, None
),
- DataType::Timestamp(TimeUnit::Second, _) =>
build_primitive_array::<
- TimestampSecondType,
- >(
- line_number, rows, i, None
- ),
- DataType::Timestamp(TimeUnit::Millisecond, _) => {
- build_primitive_array::<TimestampMillisecondType>(
+ DataType::Timestamp(TimeUnit::Second, tz) => {
+ build_timestamp_array::<TimestampSecondType>(
line_number,
rows,
i,
- None,
+ tz.as_deref(),
)
}
- DataType::Timestamp(TimeUnit::Microsecond, _) => {
- build_primitive_array::<TimestampMicrosecondType>(
+ DataType::Timestamp(TimeUnit::Millisecond, tz) => {
+ build_timestamp_array::<TimestampMillisecondType>(
line_number,
rows,
i,
- None,
+ tz.as_deref(),
)
}
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- build_primitive_array::<TimestampNanosecondType>(
+ DataType::Timestamp(TimeUnit::Microsecond, tz) => {
+ build_timestamp_array::<TimestampMicrosecondType>(
line_number,
rows,
i,
- None,
+ tz.as_deref(),
+ )
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
+ build_timestamp_array::<TimestampNanosecondType>(
+ line_number,
+ rows,
+ i,
+ tz.as_deref(),
)
}
DataType::Utf8 => Ok(Arc::new(
@@ -871,6 +876,54 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
.map(|e| Arc::new(e) as ArrayRef)
}
+fn build_timestamp_array<T: ArrowTimestampType>(
+ line_number: usize,
+ rows: &StringRecords<'_>,
+ col_idx: usize,
+ timezone: Option<&str>,
+) -> Result<ArrayRef, ArrowError> {
+ Ok(Arc::new(match timezone {
+ Some(timezone) => {
+ let tz: Tz = timezone.parse()?;
+ build_timestamp_array_impl::<T, _>(line_number, rows, col_idx,
&tz)?
+ .with_timezone(timezone)
+ }
+ None => build_timestamp_array_impl::<T, _>(line_number, rows, col_idx,
&Utc)?,
+ }))
+}
+
+fn build_timestamp_array_impl<T: ArrowTimestampType, Tz: TimeZone>(
+ line_number: usize,
+ rows: &StringRecords<'_>,
+ col_idx: usize,
+ timezone: &Tz,
+) -> Result<PrimitiveArray<T>, ArrowError> {
+ rows.iter()
+ .enumerate()
+ .map(|(row_index, row)| {
+ let s = row.get(col_idx);
+ if s.is_empty() {
+ return Ok(None);
+ }
+
+ let date = string_to_datetime(timezone, s).map_err(|e| {
+ ArrowError::ParseError(format!(
+ "Error parsing column {col_idx} at line {}: {}",
+ line_number + row_index,
+ e
+ ))
+ })?;
+
+ Ok(Some(match T::UNIT {
+ TimeUnit::Second => date.timestamp(),
+ TimeUnit::Millisecond => date.timestamp_millis(),
+ TimeUnit::Microsecond => date.timestamp_micros(),
+ TimeUnit::Nanosecond => date.timestamp_nanos(),
+ }))
+ })
+ .collect()
+}
+
// parses a specific column (col_idx) into an Arrow Array.
fn build_boolean_array(
line_number: usize,
@@ -1147,7 +1200,6 @@ mod tests {
use tempfile::NamedTempFile;
use arrow_array::cast::AsArray;
- use chrono::prelude::*;
#[test]
fn test_csv() {
@@ -1686,75 +1738,61 @@ mod tests {
);
}
- #[test]
- fn test_parse_timestamp_microseconds() {
- assert_eq!(
-
parse_item::<TimestampMicrosecondType>("1970-01-01T00:00:00Z").unwrap(),
- 0
- );
- let naive_datetime = NaiveDateTime::new(
- NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
- NaiveTime::from_hms_nano_opt(17, 11, 10, 0).unwrap(),
- );
- assert_eq!(
-
parse_item::<TimestampMicrosecondType>("2018-11-13T17:11:10").unwrap(),
- naive_datetime.timestamp_nanos() / 1000
- );
- assert_eq!(
- parse_item::<TimestampMicrosecondType>("2018-11-13
17:11:10").unwrap(),
- naive_datetime.timestamp_nanos() / 1000
- );
- let naive_datetime = NaiveDateTime::new(
- NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
- NaiveTime::from_hms_nano_opt(17, 11, 10, 11000000).unwrap(),
- );
- assert_eq!(
-
parse_item::<TimestampMicrosecondType>("2018-11-13T17:11:10.011").unwrap(),
- naive_datetime.timestamp_nanos() / 1000
- );
- let naive_datetime = NaiveDateTime::new(
- NaiveDate::from_ymd_opt(1900, 2, 28).unwrap(),
- NaiveTime::from_hms_nano_opt(12, 34, 56, 0).unwrap(),
- );
- assert_eq!(
-
parse_item::<TimestampMicrosecondType>("1900-02-28T12:34:56").unwrap(),
- naive_datetime.timestamp_nanos() / 1000
- );
+ fn test_parse_timestamp_impl<T: ArrowTimestampType>(
+ timezone: Option<String>,
+ expected: &[i64],
+ ) {
+ let csv = [
+ "1970-01-01T00:00:00",
+ "1970-01-01T00:00:00Z",
+ "1970-01-01T00:00:00+02:00",
+ ]
+ .join("\n");
+ let mut decoder = ReaderBuilder::new()
+ .with_schema(Arc::new(Schema::new(vec![Field::new(
+ "field",
+ DataType::Timestamp(T::UNIT, timezone.clone()),
+ true,
+ )])))
+ .build_decoder();
+
+ let decoded = decoder.decode(csv.as_bytes()).unwrap();
+ assert_eq!(decoded, csv.len());
+ decoder.decode(&[]).unwrap();
+
+ let batch = decoder.flush().unwrap().unwrap();
+ assert_eq!(batch.num_columns(), 1);
+ assert_eq!(batch.num_rows(), 3);
+ let col = batch.column(0).as_primitive::<T>();
+ assert_eq!(col.values(), expected);
+ assert_eq!(col.data_type(), &DataType::Timestamp(T::UNIT, timezone));
}
#[test]
- fn test_parse_timestamp_nanoseconds() {
- assert_eq!(
-
parse_item::<TimestampNanosecondType>("1970-01-01T00:00:00Z").unwrap(),
- 0
- );
- let naive_datetime = NaiveDateTime::new(
- NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
- NaiveTime::from_hms_nano_opt(17, 11, 10, 0).unwrap(),
- );
- assert_eq!(
-
parse_item::<TimestampNanosecondType>("2018-11-13T17:11:10").unwrap(),
- naive_datetime.timestamp_nanos()
+ fn test_parse_timestamp() {
+ test_parse_timestamp_impl::<TimestampNanosecondType>(
+ None,
+ &[0, 0, -7_200_000_000_000],
);
- assert_eq!(
- parse_item::<TimestampNanosecondType>("2018-11-13
17:11:10").unwrap(),
- naive_datetime.timestamp_nanos()
+ test_parse_timestamp_impl::<TimestampNanosecondType>(
+ Some("+00:00".to_string()),
+ &[0, 0, -7_200_000_000_000],
);
- let naive_datetime = NaiveDateTime::new(
- NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
- NaiveTime::from_hms_nano_opt(17, 11, 10, 11000000).unwrap(),
+ test_parse_timestamp_impl::<TimestampNanosecondType>(
+ Some("-05:00".to_string()),
+ &[18_000_000_000_000, 0, -7_200_000_000_000],
);
- assert_eq!(
-
parse_item::<TimestampNanosecondType>("2018-11-13T17:11:10.011").unwrap(),
- naive_datetime.timestamp_nanos()
+ test_parse_timestamp_impl::<TimestampMicrosecondType>(
+ Some("-03".to_string()),
+ &[10_800_000_000, 0, -7_200_000_000],
);
- let naive_datetime = NaiveDateTime::new(
- NaiveDate::from_ymd_opt(1900, 2, 28).unwrap(),
- NaiveTime::from_hms_nano_opt(12, 34, 56, 0).unwrap(),
+ test_parse_timestamp_impl::<TimestampMillisecondType>(
+ Some("-03".to_string()),
+ &[10_800_000, 0, -7_200_000],
);
- assert_eq!(
-
parse_item::<TimestampNanosecondType>("1900-02-28T12:34:56").unwrap(),
- naive_datetime.timestamp_nanos()
+ test_parse_timestamp_impl::<TimestampSecondType>(
+ Some("-03".to_string()),
+ &[10_800, 0, -7_200],
);
}