codetyri0n commented on code in PR #18417: URL: https://github.com/apache/datafusion/pull/18417#discussion_r2498835291
########## datafusion/functions/src/datetime/extract.rs: ########## @@ -0,0 +1,525 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::str::FromStr; +use std::sync::Arc; + +use arrow::array::timezone::Tz; +use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, PrimitiveBuilder}; +use arrow::compute::kernels::cast_utils::IntervalUnit; +use arrow::compute::{binary, date_part, DatePart}; +use arrow::datatypes::DataType::{ + Date32, Date64, Duration, Interval, Time32, Time64, Timestamp, +}; +use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; +use arrow::datatypes::{ + ArrowTimestampType, DataType, Field, FieldRef, Int32Type, TimeUnit, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, +}; +use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc}; +use datafusion_common::cast::as_primitive_array; +use datafusion_common::types::{logical_date, NativeType}; +use std::ops::Add; + +use datafusion_common::{ + cast::{ + as_date32_array, as_date64_array, as_int32_array, as_time32_millisecond_array, + as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array, + as_timestamp_microsecond_array, as_timestamp_millisecond_array, + as_timestamp_nanosecond_array, as_timestamp_second_array, + }, + exec_err, internal_datafusion_err, internal_err, not_impl_err, + types::logical_string, + utils::take_function_args, + Result, ScalarValue, +}; +use datafusion_expr::{ + ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, +}; +use datafusion_expr_common::signature::{Coercion, TypeSignatureClass}; +use datafusion_macros::user_doc; + +#[user_doc( + doc_section(label = "Time and Date Functions"), + description = "Returns the specified part of the date as an integer.", + syntax_example = "extract(field FROM source)", + argument( + name = "field", + description = r#"Part of the date to return. The following date parts are supported: + +- year +- quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in) +- month +- week (week of the year) +- day (day of the month) +- hour +- minute +- second +- millisecond +- microsecond +- nanosecond +- dow (day of the week where Sunday is 0) +- doy (day of the year) +- epoch (seconds since Unix epoch) +- isodow (day of the week where Monday is 0) +"# + ), + argument( + name = "source", + description = "Time expression to operate on. Can be a constant, column, or function." + ) +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct ExtractFunc { + signature: Signature, +} + +impl Default for ExtractFunc { + fn default() -> Self { + Self::new() + } +} + +impl ExtractFunc { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_implicit( + TypeSignatureClass::Timestamp, + // Not consistent with Postgres and DuckDB but to avoid regression we implicit cast string to timestamp + vec![TypeSignatureClass::Native(logical_string())], + NativeType::Timestamp(Nanosecond, None), + ), + ]), + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_exact(TypeSignatureClass::Native(logical_date())), + ]), + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_exact(TypeSignatureClass::Time), + ]), + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_exact(TypeSignatureClass::Interval), + ]), + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_exact(TypeSignatureClass::Duration), + ]), + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for ExtractFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "extract" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { + internal_err!("return_field_from_args should be called instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> { + let [field, _] = take_function_args(self.name(), args.scalar_arguments)?; + + field + .and_then(|sv| { + sv.try_as_str() + .flatten() + .filter(|s| !s.is_empty()) + .map(|part| { + if is_epoch(part) { + Field::new(self.name(), DataType::Float64, true) + } else { + Field::new(self.name(), DataType::Int32, true) + } + }) + }) + .map(Arc::new) + .map_or_else( + || exec_err!("{} requires non-empty constant string", self.name()), + Ok, + ) + } + + fn invoke_with_args( + &self, + args: datafusion_expr::ScalarFunctionArgs, + ) -> Result<ColumnarValue> { + let config = &args.config_options; + let args = args.args; + let [part, array] = take_function_args(self.name(), args)?; + + let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part { + v + } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = part { + v + } else { + return exec_err!("First argument of `EXTRACT` must be non-null scalar Utf8"); + }; + + let is_scalar = matches!(array, ColumnarValue::Scalar(_)); + + let array = match array { + ColumnarValue::Array(array) => Arc::clone(&array), + ColumnarValue::Scalar(scalar) => scalar.to_array()?, + }; + + let (is_timezone_aware, tz_str_opt) = match array.data_type() { + Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))), + _ => (false, None), + }; + + // Adjust timestamps for extraction + let array = if is_timezone_aware { + // For timezone-aware timestamps, extract in their own timezone + let tz_str = tz_str_opt.as_ref().unwrap(); + let tz = match tz_str.parse::<Tz>() { + Ok(tz) => tz, + Err(_) => return exec_err!("Invalid timezone"), + }; + match array.data_type() { + Timestamp(time_unit, _) => match time_unit { + Nanosecond => { + adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)? + } + Microsecond => { + adjust_timestamp_array::<TimestampMicrosecondType>(&array, tz)? + } + Millisecond => { + adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)? + } + Second => adjust_timestamp_array::<TimestampSecondType>(&array, tz)?, + }, + _ => array, + } + } else if let Timestamp(time_unit, None) = array.data_type() { + // For naive timestamps, interpret in session timezone + let tz = match config.execution.time_zone.parse::<Tz>() { + Ok(tz) => tz, + Err(_) => return exec_err!("Invalid timezone"), + }; + match time_unit { + Nanosecond => { + adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)? + } + Microsecond => { + adjust_timestamp_array::<TimestampMicrosecondType>(&array, tz)? + } + Millisecond => { + adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)? + } + Second => adjust_timestamp_array::<TimestampSecondType>(&array, tz)?, + } + } else { + array + }; + + let part_trim = part_normalization(&part); + + // using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds") + // and synonyms ( like "ms,msec,msecond,millisecond") to Arrow + let mut arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) { + match interval_unit { + IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?, + IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?, + IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?, + IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?, + IntervalUnit::Hour => date_part(array.as_ref(), DatePart::Hour)?, + IntervalUnit::Minute => date_part(array.as_ref(), DatePart::Minute)?, + IntervalUnit::Second => seconds_as_i32(array.as_ref(), Second)?, + IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?, + IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?, + IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), Nanosecond)?, + // century and decade are not supported by `DatePart`, although they are supported in postgres + _ => return exec_err!("Date part '{part}' not supported"), + } + } else { + // special cases that can be extracted (in postgres) but are not interval units + match part_trim.to_lowercase().as_str() { + "qtr" | "quarter" => date_part(array.as_ref(), DatePart::Quarter)?, + "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?, + "dow" => date_part(array.as_ref(), DatePart::DayOfWeekSunday0)?, + "isodow" => date_part(array.as_ref(), DatePart::DayOfWeekMonday0)?, + "epoch" => epoch(array.as_ref())?, Review Comment: thanks for catching this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
