This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b7cdcb  ARROW-11055: [Rust] [DataFusion] Support date_trunc function
4b7cdcb is described below

commit 4b7cdcb9220b6d94b251aef32c21ef9b4097ecfa
Author: Pavel Tiunov <[email protected]>
AuthorDate: Sat Jan 2 07:53:06 2021 -0500

    ARROW-11055: [Rust] [DataFusion] Support date_trunc function
    
    `date_trunc` SQL function implementation and GROUP BY timestamp support.
    
    Closes #9040 from paveltiunov/date-trunc
    
    Authored-by: Pavel Tiunov <[email protected]>
    Signed-off-by: Andrew Lamb <[email protected]>
---
 rust/arrow/src/csv/reader.rs                       |  34 +++++
 rust/datafusion/src/execution/context.rs           |  51 +++++++
 .../src/physical_plan/datetime_expressions.rs      | 161 +++++++++++++++++++++
 rust/datafusion/src/physical_plan/functions.rs     |  13 ++
 rust/datafusion/src/physical_plan/group_scalar.rs  |   4 +
 .../datafusion/src/physical_plan/hash_aggregate.rs |  23 ++-
 rust/datafusion/src/physical_plan/hash_join.rs     |  17 ++-
 rust/datafusion/src/physical_plan/type_coercion.rs |   3 +-
 rust/datafusion/src/scalar.rs                      |  26 +++-
 rust/datafusion/src/test/mod.rs                    |  20 ++-
 10 files changed, 346 insertions(+), 6 deletions(-)

diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs
index 3fca7b2..c9f97cd 100644
--- a/rust/arrow/src/csv/reader.rs
+++ b/rust/arrow/src/csv/reader.rs
@@ -449,6 +449,16 @@ fn parse(
                 &DataType::Date64(_) => {
                     build_primitive_array::<Date64Type>(line_number, rows, i)
                 }
+                &DataType::Timestamp(TimeUnit::Microsecond, _) => {
+                    build_primitive_array::<TimestampMicrosecondType>(
+                        line_number,
+                        rows,
+                        i,
+                    )
+                }
+                &DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+                    
build_primitive_array::<TimestampNanosecondType>(line_number, rows, i)
+                }
                 &DataType::Utf8 => Ok(Arc::new(
                     rows.iter().map(|row| row.get(i)).collect::<StringArray>(),
                 ) as ArrayRef),
@@ -531,6 +541,30 @@ impl Parser for Date64Type {
     }
 }
 
+impl Parser for TimestampNanosecondType {
+    fn parse(string: &str) -> Option<i64> {
+        match Self::DATA_TYPE {
+            DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+                let date_time = string.parse::<chrono::NaiveDateTime>().ok()?;
+                Self::Native::from_i64(date_time.timestamp_nanos())
+            }
+            _ => None,
+        }
+    }
+}
+
+impl Parser for TimestampMicrosecondType {
+    fn parse(string: &str) -> Option<i64> {
+        match Self::DATA_TYPE {
+            DataType::Timestamp(TimeUnit::Microsecond, None) => {
+                let date_time = string.parse::<chrono::NaiveDateTime>().ok()?;
+                Self::Native::from_i64(date_time.timestamp_nanos() / 1000)
+            }
+            _ => None,
+        }
+    }
+}
+
 fn parse_item<T: Parser>(string: &str) -> Option<T::Native> {
     T::parse(string)
 }
diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index dae9c90..ebfc887 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -1055,6 +1055,57 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn group_by_date_trunc() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let mut ctx = ExecutionContext::new();
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c2", DataType::UInt64, false),
+            Field::new(
+                "t1",
+                DataType::Timestamp(TimeUnit::Microsecond, None),
+                false,
+            ),
+        ]));
+
+        // generate a partitioned file
+        for partition in 0..4 {
+            let filename = format!("partition-{}.{}", partition, "csv");
+            let file_path = tmp_dir.path().join(&filename);
+            let mut file = File::create(file_path)?;
+
+            // generate some data
+            for i in 0..10 {
+                let data = format!("{},2020-12-{}T00:00:00.000\n", i, i + 10);
+                file.write_all(data.as_bytes())?;
+            }
+        }
+
+        ctx.register_csv(
+            "test",
+            tmp_dir.path().to_str().unwrap(),
+            CsvReadOptions::new().schema(&schema).has_header(false),
+        )?;
+
+        let results = plan_and_collect(
+            &mut ctx,
+            "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY 
date_trunc('week', t1)"
+        ).await?;
+        assert_eq!(results.len(), 1);
+
+        let batch = &results[0];
+
+        assert_eq!(field_names(batch), vec!["week", "SUM(c2)"]);
+
+        let expected: Vec<&str> =
+            vec!["2020-12-07T00:00:00,24", "2020-12-14T00:00:00,156"];
+        let mut rows = test::format_batch(&batch);
+        rows.sort();
+        assert_eq!(rows, expected);
+
+        Ok(())
+    }
+
     async fn run_count_distinct_integers_aggregated_scenario(
         partitions: Vec<Vec<(&str, u64)>>,
     ) -> Result<Vec<RecordBatch>> {
diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs 
b/rust/datafusion/src/physical_plan/datetime_expressions.rs
index a12b00c..247f899 100644
--- a/rust/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs
@@ -25,6 +25,7 @@ use arrow::{
     buffer::Buffer,
     datatypes::{DataType, TimeUnit, ToByteSlice},
 };
+use chrono::Duration;
 use chrono::{prelude::*, LocalResult};
 
 #[inline]
@@ -205,6 +206,108 @@ pub fn to_timestamp(args: &[ArrayRef]) -> 
Result<TimestampNanosecondArray> {
     Ok(TimestampNanosecondArray::from(Arc::new(data)))
 }
 
+/// date_trunc SQL function
+pub fn date_trunc(args: &[ArrayRef]) -> Result<TimestampNanosecondArray> {
+    let granularity_array =
+        &args[0]
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .ok_or_else(|| {
+                DataFusionError::Execution(
+                    "Could not cast date_trunc granularity input to 
StringArray"
+                        .to_string(),
+                )
+            })?;
+
+    let array = &args[1]
+        .as_any()
+        .downcast_ref::<TimestampNanosecondArray>()
+        .ok_or_else(|| {
+            DataFusionError::Execution(
+                "Could not cast date_trunc array input to 
TimestampNanosecondArray"
+                    .to_string(),
+            )
+        })?;
+
+    let range = 0..array.len();
+    let result = range
+        .map(|i| {
+            if array.is_null(i) {
+                Ok(0_i64)
+            } else {
+                let date_time = match granularity_array.value(i) {
+                    "second" => array
+                        .value_as_datetime(i)
+                        .and_then(|d| d.with_nanosecond(0)),
+                    "minute" => array
+                        .value_as_datetime(i)
+                        .and_then(|d| d.with_nanosecond(0))
+                        .and_then(|d| d.with_second(0)),
+                    "hour" => array
+                        .value_as_datetime(i)
+                        .and_then(|d| d.with_nanosecond(0))
+                        .and_then(|d| d.with_second(0))
+                        .and_then(|d| d.with_minute(0)),
+                    "day" => array
+                        .value_as_datetime(i)
+                        .and_then(|d| d.with_nanosecond(0))
+                        .and_then(|d| d.with_second(0))
+                        .and_then(|d| d.with_minute(0))
+                        .and_then(|d| d.with_hour(0)),
+                    "week" => array
+                        .value_as_datetime(i)
+                        .and_then(|d| d.with_nanosecond(0))
+                        .and_then(|d| d.with_second(0))
+                        .and_then(|d| d.with_minute(0))
+                        .and_then(|d| d.with_hour(0))
+                        .map(|d| {
+                            d - Duration::seconds(60 * 60 * 24 * d.weekday() 
as i64)
+                        }),
+                    "month" => array
+                        .value_as_datetime(i)
+                        .and_then(|d| d.with_nanosecond(0))
+                        .and_then(|d| d.with_second(0))
+                        .and_then(|d| d.with_minute(0))
+                        .and_then(|d| d.with_hour(0))
+                        .and_then(|d| d.with_day0(0)),
+                    "year" => array
+                        .value_as_datetime(i)
+                        .and_then(|d| d.with_nanosecond(0))
+                        .and_then(|d| d.with_second(0))
+                        .and_then(|d| d.with_minute(0))
+                        .and_then(|d| d.with_hour(0))
+                        .and_then(|d| d.with_day0(0))
+                        .and_then(|d| d.with_month0(0)),
+                    unsupported => {
+                        return Err(DataFusionError::Execution(format!(
+                            "Unsupported date_trunc granularity: {}",
+                            unsupported
+                        )))
+                    }
+                };
+                date_time.map(|d| d.timestamp_nanos()).ok_or_else(|| {
+                    DataFusionError::Execution(format!(
+                        "Can't truncate date time: {:?}",
+                        array.value_as_datetime(i)
+                    ))
+                })
+            }
+        })
+        .collect::<Result<Vec<_>>>()?;
+
+    let data = ArrayData::new(
+        DataType::Timestamp(TimeUnit::Nanosecond, None),
+        array.len(),
+        Some(array.null_count()),
+        array.data().null_buffer().cloned(),
+        0,
+        vec![Buffer::from(result.to_byte_slice())],
+        vec![],
+    );
+
+    Ok(TimestampNanosecondArray::from(Arc::new(data)))
+}
+
 #[cfg(test)]
 mod tests {
     use std::sync::Arc;
@@ -379,6 +482,64 @@ mod tests {
     }
 
     #[test]
+    fn date_trunc_test() -> Result<()> {
+        let mut ts_builder = StringBuilder::new(2);
+        let mut truncated_builder = StringBuilder::new(2);
+        let mut string_builder = StringBuilder::new(2);
+
+        ts_builder.append_null()?;
+        truncated_builder.append_null()?;
+        string_builder.append_value("second")?;
+
+        ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+        truncated_builder.append_value("2020-09-08T13:42:29.000000Z")?;
+        string_builder.append_value("second")?;
+
+        ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+        truncated_builder.append_value("2020-09-08T13:42:00.000000Z")?;
+        string_builder.append_value("minute")?;
+
+        ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+        truncated_builder.append_value("2020-09-08T13:00:00.000000Z")?;
+        string_builder.append_value("hour")?;
+
+        ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+        truncated_builder.append_value("2020-09-08T00:00:00.000000Z")?;
+        string_builder.append_value("day")?;
+
+        ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+        truncated_builder.append_value("2020-09-07T00:00:00.000000Z")?;
+        string_builder.append_value("week")?;
+
+        ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+        truncated_builder.append_value("2020-09-01T00:00:00.000000Z")?;
+        string_builder.append_value("month")?;
+
+        ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+        truncated_builder.append_value("2020-01-01T00:00:00.000000Z")?;
+        string_builder.append_value("year")?;
+
+        ts_builder.append_value("2021-01-01T13:42:29.190855Z")?;
+        truncated_builder.append_value("2020-12-28T00:00:00.000000Z")?;
+        string_builder.append_value("week")?;
+
+        ts_builder.append_value("2020-01-01T13:42:29.190855Z")?;
+        truncated_builder.append_value("2019-12-30T00:00:00.000000Z")?;
+        string_builder.append_value("week")?;
+
+        let string_array = Arc::new(string_builder.finish());
+        let ts_array = 
Arc::new(to_timestamp(&[Arc::new(ts_builder.finish())]).unwrap());
+        let date_trunc_array = date_trunc(&[string_array, ts_array])
+            .expect("that to_timestamp parsed values without error");
+
+        let expected_timestamps =
+            to_timestamp(&[Arc::new(truncated_builder.finish())]).unwrap();
+
+        assert_eq!(date_trunc_array, expected_timestamps);
+        Ok(())
+    }
+
+    #[test]
     fn to_timestamp_invalid_input_type() -> Result<()> {
         // pass the wrong type of input array to to_timestamp and test
         // that we get an error.
diff --git a/rust/datafusion/src/physical_plan/functions.rs 
b/rust/datafusion/src/physical_plan/functions.rs
index 0023fbf..316586d 100644
--- a/rust/datafusion/src/physical_plan/functions.rs
+++ b/rust/datafusion/src/physical_plan/functions.rs
@@ -130,6 +130,8 @@ pub enum BuiltinScalarFunction {
     Array,
     /// SQL NULLIF()
     NullIf,
+    /// Date truncate
+    DateTrunc,
 }
 
 impl fmt::Display for BuiltinScalarFunction {
@@ -168,6 +170,7 @@ impl FromStr for BuiltinScalarFunction {
             "trim" => BuiltinScalarFunction::Trim,
             "upper" => BuiltinScalarFunction::Upper,
             "to_timestamp" => BuiltinScalarFunction::ToTimestamp,
+            "date_trunc" => BuiltinScalarFunction::DateTrunc,
             "array" => BuiltinScalarFunction::Array,
             "nullif" => BuiltinScalarFunction::NullIf,
             _ => {
@@ -247,6 +250,9 @@ pub fn return_type(
         BuiltinScalarFunction::ToTimestamp => {
             Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
         }
+        BuiltinScalarFunction::DateTrunc => {
+            Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
+        }
         BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList(
             Box::new(Field::new("item", arg_types[0].clone(), true)),
             arg_types.len() as i32,
@@ -317,6 +323,9 @@ pub fn create_physical_expr(
         BuiltinScalarFunction::ToTimestamp => {
             |args| Ok(Arc::new(datetime_expressions::to_timestamp(args)?))
         }
+        BuiltinScalarFunction::DateTrunc => {
+            |args| Ok(Arc::new(datetime_expressions::date_trunc(args)?))
+        }
         BuiltinScalarFunction::Array => |args| 
Ok(array_expressions::array(args)?),
     });
     // coerce
@@ -355,6 +364,10 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature {
             Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8])
         }
         BuiltinScalarFunction::ToTimestamp => Signature::Uniform(1, 
vec![DataType::Utf8]),
+        BuiltinScalarFunction::DateTrunc => Signature::Exact(vec![
+            DataType::Utf8,
+            DataType::Timestamp(TimeUnit::Nanosecond, None),
+        ]),
         BuiltinScalarFunction::Array => {
             
Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec())
         }
diff --git a/rust/datafusion/src/physical_plan/group_scalar.rs 
b/rust/datafusion/src/physical_plan/group_scalar.rs
index 8c11a6b..295eb46 100644
--- a/rust/datafusion/src/physical_plan/group_scalar.rs
+++ b/rust/datafusion/src/physical_plan/group_scalar.rs
@@ -35,6 +35,8 @@ pub(crate) enum GroupByScalar {
     Int32(i32),
     Int64(i64),
     Utf8(Box<String>),
+    TimeMicrosecond(i64),
+    TimeNanosecond(i64),
 }
 
 impl TryFrom<&ScalarValue> for GroupByScalar {
@@ -87,6 +89,8 @@ impl From<&GroupByScalar> for ScalarValue {
             GroupByScalar::UInt32(v) => ScalarValue::UInt32(Some(*v)),
             GroupByScalar::UInt64(v) => ScalarValue::UInt64(Some(*v)),
             GroupByScalar::Utf8(v) => ScalarValue::Utf8(Some(v.to_string())),
+            GroupByScalar::TimeMicrosecond(v) => 
ScalarValue::TimeMicrosecond(Some(*v)),
+            GroupByScalar::TimeNanosecond(v) => 
ScalarValue::TimeNanosecond(Some(*v)),
         }
     }
 }
diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs 
b/rust/datafusion/src/physical_plan/hash_aggregate.rs
index 9677246..864bc78 100644
--- a/rust/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs
@@ -30,7 +30,7 @@ use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{Accumulator, AggregateExpr};
 use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, 
PhysicalExpr};
 
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
 use arrow::{
@@ -49,6 +49,7 @@ use super::{
 use ahash::RandomState;
 use hashbrown::HashMap;
 
+use arrow::array::{TimestampMicrosecondArray, TimestampNanosecondArray};
 use async_trait::async_trait;
 
 /// Hash aggregate modes
@@ -672,6 +673,12 @@ fn create_batch_from_map(
                     GroupByScalar::Utf8(str) => {
                         Arc::new(StringArray::from(vec![&***str]))
                     }
+                    GroupByScalar::TimeMicrosecond(n) => {
+                        Arc::new(TimestampMicrosecondArray::from(vec![*n]))
+                    }
+                    GroupByScalar::TimeNanosecond(n) => {
+                        Arc::new(TimestampNanosecondArray::from_vec(vec![*n], 
None))
+                    }
                 })
                 .collect::<Vec<ArrayRef>>();
 
@@ -780,6 +787,20 @@ pub(crate) fn create_group_by_values(
                 let array = 
col.as_any().downcast_ref::<StringArray>().unwrap();
                 vec[i] = GroupByScalar::Utf8(Box::new(array.value(row).into()))
             }
+            DataType::Timestamp(TimeUnit::Microsecond, None) => {
+                let array = col
+                    .as_any()
+                    .downcast_ref::<TimestampMicrosecondArray>()
+                    .unwrap();
+                vec[i] = GroupByScalar::TimeMicrosecond(array.value(row))
+            }
+            DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+                let array = col
+                    .as_any()
+                    .downcast_ref::<TimestampNanosecondArray>()
+                    .unwrap();
+                vec[i] = GroupByScalar::TimeNanosecond(array.value(row))
+            }
             _ => {
                 // This is internal because we should have caught this before.
                 return Err(DataFusionError::Internal(
diff --git a/rust/datafusion/src/physical_plan/hash_join.rs 
b/rust/datafusion/src/physical_plan/hash_join.rs
index 9ac7447..8a3ee4b 100644
--- a/rust/datafusion/src/physical_plan/hash_join.rs
+++ b/rust/datafusion/src/physical_plan/hash_join.rs
@@ -18,6 +18,7 @@
 //! Defines the join plan for executing partitions in parallel and then 
joining the results
 //! into a set of partitions.
 
+use arrow::array::{TimestampMicrosecondArray, TimestampNanosecondArray};
 use arrow::{array::ArrayRef, compute};
 use std::sync::Arc;
 use std::{any::Any, collections::HashSet};
@@ -28,7 +29,7 @@ use hashbrown::HashMap;
 use tokio::sync::Mutex;
 
 use arrow::array::{make_array, Array, MutableArrayData};
-use arrow::datatypes::DataType;
+use arrow::datatypes::{DataType, TimeUnit};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
@@ -373,6 +374,20 @@ pub(crate) fn create_key(
                 let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
                 vec.extend(array.value(row).to_le_bytes().iter());
             }
+            DataType::Timestamp(TimeUnit::Microsecond, None) => {
+                let array = col
+                    .as_any()
+                    .downcast_ref::<TimestampMicrosecondArray>()
+                    .unwrap();
+                vec.extend(array.value(row).to_le_bytes().iter());
+            }
+            DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+                let array = col
+                    .as_any()
+                    .downcast_ref::<TimestampNanosecondArray>()
+                    .unwrap();
+                vec.extend(array.value(row).to_le_bytes().iter());
+            }
             DataType::Utf8 => {
                 let array = 
col.as_any().downcast_ref::<StringArray>().unwrap();
                 let value = array.value(row);
diff --git a/rust/datafusion/src/physical_plan/type_coercion.rs 
b/rust/datafusion/src/physical_plan/type_coercion.rs
index 91eaf65..090729a 100644
--- a/rust/datafusion/src/physical_plan/type_coercion.rs
+++ b/rust/datafusion/src/physical_plan/type_coercion.rs
@@ -31,7 +31,7 @@
 
 use std::sync::Arc;
 
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::{DataType, Schema, TimeUnit};
 
 use super::{functions::Signature, PhysicalExpr};
 use crate::error::{DataFusionError, Result};
@@ -176,6 +176,7 @@ pub fn can_coerce_from(type_into: &DataType, type_from: 
&DataType) -> bool {
                 | Float32
                 | Float64
         ),
+        Timestamp(TimeUnit::Nanosecond, None) => matches!(type_from, 
Timestamp(_, None)),
         Utf8 => true,
         _ => false,
     }
diff --git a/rust/datafusion/src/scalar.rs b/rust/datafusion/src/scalar.rs
index 5afcabd..64e5bcc 100644
--- a/rust/datafusion/src/scalar.rs
+++ b/rust/datafusion/src/scalar.rs
@@ -20,8 +20,9 @@
 use std::{convert::TryFrom, fmt, sync::Arc};
 
 use arrow::array::{
-    Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, 
UInt16Builder,
-    UInt32Builder, UInt64Builder, UInt8Builder,
+    Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder,
+    TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Builder, 
UInt32Builder,
+    UInt64Builder, UInt8Builder,
 };
 use arrow::{
     array::ArrayRef,
@@ -37,6 +38,7 @@ use arrow::{
 };
 
 use crate::error::{DataFusionError, Result};
+use arrow::datatypes::TimeUnit;
 
 /// Represents a dynamically typed, nullable single value.
 /// This is the single-valued counter-part of arrow’s `Array`.
@@ -72,6 +74,10 @@ pub enum ScalarValue {
     List(Option<Vec<ScalarValue>>, DataType),
     /// Date stored as a signed 32bit int
     Date32(Option<i32>),
+    /// Timestamp Microseconds
+    TimeMicrosecond(Option<i64>),
+    /// Timestamp Nanoseconds
+    TimeNanosecond(Option<i64>),
 }
 
 macro_rules! typed_cast {
@@ -131,6 +137,12 @@ impl ScalarValue {
             ScalarValue::Int16(_) => DataType::Int16,
             ScalarValue::Int32(_) => DataType::Int32,
             ScalarValue::Int64(_) => DataType::Int64,
+            ScalarValue::TimeMicrosecond(_) => {
+                DataType::Timestamp(TimeUnit::Microsecond, None)
+            }
+            ScalarValue::TimeNanosecond(_) => {
+                DataType::Timestamp(TimeUnit::Nanosecond, None)
+            }
             ScalarValue::Float32(_) => DataType::Float32,
             ScalarValue::Float64(_) => DataType::Float64,
             ScalarValue::Utf8(_) => DataType::Utf8,
@@ -205,6 +217,12 @@ impl ScalarValue {
             ScalarValue::UInt16(e) => Arc::new(UInt16Array::from(vec![*e; 
size])),
             ScalarValue::UInt32(e) => Arc::new(UInt32Array::from(vec![*e; 
size])),
             ScalarValue::UInt64(e) => Arc::new(UInt64Array::from(vec![*e; 
size])),
+            ScalarValue::TimeMicrosecond(e) => {
+                Arc::new(TimestampMicrosecondArray::from(vec![*e]))
+            }
+            ScalarValue::TimeNanosecond(e) => {
+                Arc::new(TimestampNanosecondArray::from_opt_vec(vec![*e], 
None))
+            }
             ScalarValue::Utf8(e) => 
Arc::new(StringArray::from(vec![e.as_deref(); size])),
             ScalarValue::LargeUtf8(e) => {
                 Arc::new(LargeStringArray::from(vec![e.as_deref(); size]))
@@ -440,6 +458,8 @@ impl fmt::Display for ScalarValue {
             ScalarValue::UInt16(e) => format_option!(f, e)?,
             ScalarValue::UInt32(e) => format_option!(f, e)?,
             ScalarValue::UInt64(e) => format_option!(f, e)?,
+            ScalarValue::TimeMicrosecond(e) => format_option!(f, e)?,
+            ScalarValue::TimeNanosecond(e) => format_option!(f, e)?,
             ScalarValue::Utf8(e) => format_option!(f, e)?,
             ScalarValue::LargeUtf8(e) => format_option!(f, e)?,
             ScalarValue::List(e, _) => match e {
@@ -473,6 +493,8 @@ impl fmt::Debug for ScalarValue {
             ScalarValue::UInt16(_) => write!(f, "UInt16({})", self),
             ScalarValue::UInt32(_) => write!(f, "UInt32({})", self),
             ScalarValue::UInt64(_) => write!(f, "UInt64({})", self),
+            ScalarValue::TimeMicrosecond(_) => write!(f, 
"TimeMicrosecond({})", self),
+            ScalarValue::TimeNanosecond(_) => write!(f, "TimeNanosecond({})", 
self),
             ScalarValue::Utf8(None) => write!(f, "Utf8({})", self),
             ScalarValue::Utf8(Some(_)) => write!(f, "Utf8(\"{}\")", self),
             ScalarValue::LargeUtf8(None) => write!(f, "LargeUtf8({})", self),
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index db0ef3b..e589834 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -21,7 +21,7 @@ use crate::datasource::{MemTable, TableProvider};
 use crate::error::Result;
 use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder};
 use arrow::array::{self, Int32Array};
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
 use arrow::record_batch::RecordBatch;
 use std::fs::File;
 use std::io::prelude::*;
@@ -210,6 +210,24 @@ pub fn format_batch(batch: &RecordBatch) -> Vec<String> {
                         .unwrap()
                         .value(row_index)
                 )),
+                DataType::Timestamp(TimeUnit::Microsecond, _) => 
s.push_str(&format!(
+                    "{:?}",
+                    array
+                        .as_any()
+                        .downcast_ref::<array::TimestampMicrosecondArray>()
+                        .unwrap()
+                        .value_as_datetime(row_index)
+                        .unwrap()
+                )),
+                DataType::Timestamp(TimeUnit::Nanosecond, _) => 
s.push_str(&format!(
+                    "{:?}",
+                    array
+                        .as_any()
+                        .downcast_ref::<array::TimestampNanosecondArray>()
+                        .unwrap()
+                        .value_as_datetime(row_index)
+                        .unwrap()
+                )),
                 _ => s.push('?'),
             }
         }

Reply via email to