martin-g commented on code in PR #18417:
URL: https://github.com/apache/datafusion/pull/18417#discussion_r2498707344


##########
datafusion/functions/src/datetime/extract.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow::array::timezone::Tz;
+use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, 
PrimitiveBuilder};
+use arrow::compute::kernels::cast_utils::IntervalUnit;
+use arrow::compute::{binary, date_part, DatePart};
+use arrow::datatypes::DataType::{
+    Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
+};
+use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
+use arrow::datatypes::{
+    ArrowTimestampType, DataType, Field, FieldRef, Int32Type, TimeUnit,
+    TimestampMicrosecondType, TimestampMillisecondType, 
TimestampNanosecondType,
+    TimestampSecondType,
+};
+use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
+use datafusion_common::cast::as_primitive_array;
+use datafusion_common::types::{logical_date, NativeType};
+use std::ops::Add;
+
+use datafusion_common::{
+    cast::{
+        as_date32_array, as_date64_array, as_int32_array, 
as_time32_millisecond_array,
+        as_time32_second_array, as_time64_microsecond_array, 
as_time64_nanosecond_array,
+        as_timestamp_microsecond_array, as_timestamp_millisecond_array,
+        as_timestamp_nanosecond_array, as_timestamp_second_array,
+    },
+    exec_err, internal_datafusion_err, internal_err, not_impl_err,
+    types::logical_string,
+    utils::take_function_args,
+    Result, ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature,
+    TypeSignature, Volatility,
+};
+use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
+use datafusion_macros::user_doc;
+
+#[user_doc(
+    doc_section(label = "Time and Date Functions"),
+    description = "Returns the specified part of the date as an integer.",
+    syntax_example = "extract(field FROM source)",
+    argument(
+        name = "field",
+        description = r#"Part of the date to return. The following date parts 
are supported:
+
+- year
+- quarter (emits value in inclusive range [1, 4] based on which quartile of 
the year the date is in)
+- month
+- week (week of the year)
+- day (day of the month)
+- hour
+- minute
+- second
+- millisecond
+- microsecond
+- nanosecond
+- dow (day of the week where Sunday is 0)
+- doy (day of the year)
+- epoch (seconds since Unix epoch)
+- isodow (day of the week where Monday is 0)
+"#
+    ),
+    argument(
+        name = "source",
+        description = "Time expression to operate on. Can be a constant, 
column, or function."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct ExtractFunc {
+    signature: Signature,
+}
+
+impl Default for ExtractFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ExtractFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::one_of(
+                vec![
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_implicit(
+                            TypeSignatureClass::Timestamp,
+                            // Not consistent with Postgres and DuckDB but to 
avoid regression we implicit cast string to timestamp
+                            vec![TypeSignatureClass::Native(logical_string())],
+                            NativeType::Timestamp(Nanosecond, None),
+                        ),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Time),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Interval),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Duration),
+                    ]),
+                ],
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+
+impl ScalarUDFImpl for ExtractFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "extract"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        internal_err!("return_field_from_args should be called instead")
+    }
+
+    fn return_field_from_args(&self, args: ReturnFieldArgs) -> 
Result<FieldRef> {
+        let [field, _] = take_function_args(self.name(), 
args.scalar_arguments)?;
+
+        field
+            .and_then(|sv| {
+                sv.try_as_str()
+                    .flatten()
+                    .filter(|s| !s.is_empty())
+                    .map(|part| {
+                        if is_epoch(part) {
+                            Field::new(self.name(), DataType::Float64, true)
+                        } else {
+                            Field::new(self.name(), DataType::Int32, true)
+                        }
+                    })
+            })
+            .map(Arc::new)
+            .map_or_else(
+                || exec_err!("{} requires non-empty constant string", 
self.name()),
+                Ok,
+            )
+    }
+
+    fn invoke_with_args(
+        &self,
+        args: datafusion_expr::ScalarFunctionArgs,
+    ) -> Result<ColumnarValue> {
+        let config = &args.config_options;
+        let args = args.args;
+        let [part, array] = take_function_args(self.name(), args)?;
+
+        let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = 
part {
+            v
+        } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = 
part {
+            v
+        } else {
+            return exec_err!("First argument of `EXTRACT` must be non-null 
scalar Utf8");
+        };
+
+        let is_scalar = matches!(array, ColumnarValue::Scalar(_));
+
+        let array = match array {
+            ColumnarValue::Array(array) => Arc::clone(&array),
+            ColumnarValue::Scalar(scalar) => scalar.to_array()?,
+        };
+
+        let (is_timezone_aware, tz_str_opt) = match array.data_type() {
+            Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))),
+            _ => (false, None),
+        };
+
+        // Adjust timestamps for extraction
+        let array = if is_timezone_aware {
+            // For timezone-aware timestamps, extract in their own timezone
+            let tz_str = tz_str_opt.as_ref().unwrap();
+            let tz = match tz_str.parse::<Tz>() {
+                Ok(tz) => tz,
+                Err(_) => return exec_err!("Invalid timezone"),
+            };
+            match array.data_type() {
+                Timestamp(time_unit, _) => match time_unit {
+                    Nanosecond => {
+                        
adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)?
+                    }
+                    Microsecond => {
+                        
adjust_timestamp_array::<TimestampMicrosecondType>(&array, tz)?
+                    }
+                    Millisecond => {
+                        
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
+                    }
+                    Second => 
adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
+                },
+                _ => array,
+            }
+        } else if let Timestamp(time_unit, None) = array.data_type() {
+            // For naive timestamps, interpret in session timezone
+            let tz = match config.execution.time_zone.parse::<Tz>() {
+                Ok(tz) => tz,
+                Err(_) => return exec_err!("Invalid timezone"),
+            };
+            match time_unit {
+                Nanosecond => {
+                    adjust_timestamp_array::<TimestampNanosecondType>(&array, 
tz)?
+                }
+                Microsecond => {
+                    adjust_timestamp_array::<TimestampMicrosecondType>(&array, 
tz)?
+                }
+                Millisecond => {
+                    adjust_timestamp_array::<TimestampMillisecondType>(&array, 
tz)?
+                }
+                Second => 
adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
+            }
+        } else {
+            array
+        };
+
+        let part_trim = part_normalization(&part);
+
+        // using IntervalUnit here means we hand off all the work of 
supporting plurals (like "seconds")
+        // and synonyms ( like "ms,msec,msecond,millisecond") to Arrow
+        let mut arr = if let Ok(interval_unit) = 
IntervalUnit::from_str(part_trim) {
+            match interval_unit {
+                IntervalUnit::Year => date_part(array.as_ref(), 
DatePart::Year)?,
+                IntervalUnit::Month => date_part(array.as_ref(), 
DatePart::Month)?,
+                IntervalUnit::Week => date_part(array.as_ref(), 
DatePart::Week)?,
+                IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?,
+                IntervalUnit::Hour => date_part(array.as_ref(), 
DatePart::Hour)?,
+                IntervalUnit::Minute => date_part(array.as_ref(), 
DatePart::Minute)?,
+                IntervalUnit::Second => seconds_as_i32(array.as_ref(), 
Second)?,
+                IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), 
Millisecond)?,
+                IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), 
Microsecond)?,
+                IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), 
Nanosecond)?,
+                // century and decade are not supported by `DatePart`, 
although they are supported in postgres
+                _ => return exec_err!("Date part '{part}' not supported"),
+            }
+        } else {
+            // special cases that can be extracted (in postgres) but are not 
interval units
+            match part_trim.to_lowercase().as_str() {
+                "qtr" | "quarter" => date_part(array.as_ref(), 
DatePart::Quarter)?,
+                "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
+                "dow" => date_part(array.as_ref(), 
DatePart::DayOfWeekSunday0)?,
+                "isodow" => date_part(array.as_ref(), 
DatePart::DayOfWeekMonday0)?,
+                "epoch" => epoch(array.as_ref())?,
+                _ => return exec_err!("Date part '{part}' not supported"),
+            }
+        };
+
+        // Special adjustment for hour extraction on timezone-aware timestamps
+        if is_timezone_aware && part_trim.to_lowercase() == "hour" {
+            if let Some(tz_str) = &tz_str_opt {
+                let offset_hours = if tz_str.as_ref() == "+00:00" {
+                    0
+                } else {
+                    let sign = if tz_str.starts_with('+') { 1i32 } else { 
-1i32 };
+                    let hours_str = &tz_str[1..3];

Review Comment:
   Some timezones use offsets like "+01:30" or "-02:45", so you will need 
similar logic to adjust the minutes.
   https://www.timeanddate.com/time/time-zones-interesting.html



##########
datafusion/functions/src/datetime/extract.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow::array::timezone::Tz;
+use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, 
PrimitiveBuilder};
+use arrow::compute::kernels::cast_utils::IntervalUnit;
+use arrow::compute::{binary, date_part, DatePart};
+use arrow::datatypes::DataType::{
+    Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
+};
+use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
+use arrow::datatypes::{
+    ArrowTimestampType, DataType, Field, FieldRef, Int32Type, TimeUnit,
+    TimestampMicrosecondType, TimestampMillisecondType, 
TimestampNanosecondType,
+    TimestampSecondType,
+};
+use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
+use datafusion_common::cast::as_primitive_array;
+use datafusion_common::types::{logical_date, NativeType};
+use std::ops::Add;
+
+use datafusion_common::{
+    cast::{
+        as_date32_array, as_date64_array, as_int32_array, 
as_time32_millisecond_array,
+        as_time32_second_array, as_time64_microsecond_array, 
as_time64_nanosecond_array,
+        as_timestamp_microsecond_array, as_timestamp_millisecond_array,
+        as_timestamp_nanosecond_array, as_timestamp_second_array,
+    },
+    exec_err, internal_datafusion_err, internal_err, not_impl_err,
+    types::logical_string,
+    utils::take_function_args,
+    Result, ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature,
+    TypeSignature, Volatility,
+};
+use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
+use datafusion_macros::user_doc;
+
+#[user_doc(
+    doc_section(label = "Time and Date Functions"),
+    description = "Returns the specified part of the date as an integer.",
+    syntax_example = "extract(field FROM source)",
+    argument(
+        name = "field",
+        description = r#"Part of the date to return. The following date parts 
are supported:
+
+- year
+- quarter (emits value in inclusive range [1, 4] based on which quartile of 
the year the date is in)
+- month
+- week (week of the year)
+- day (day of the month)
+- hour
+- minute
+- second
+- millisecond
+- microsecond
+- nanosecond
+- dow (day of the week where Sunday is 0)
+- doy (day of the year)
+- epoch (seconds since Unix epoch)
+- isodow (day of the week where Monday is 0)
+"#
+    ),
+    argument(
+        name = "source",
+        description = "Time expression to operate on. Can be a constant, 
column, or function."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct ExtractFunc {
+    signature: Signature,
+}
+
+impl Default for ExtractFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ExtractFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::one_of(
+                vec![
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_implicit(
+                            TypeSignatureClass::Timestamp,
+                            // Not consistent with Postgres and DuckDB but to 
avoid regression we implicit cast string to timestamp
+                            vec![TypeSignatureClass::Native(logical_string())],
+                            NativeType::Timestamp(Nanosecond, None),
+                        ),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Time),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Interval),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Duration),
+                    ]),
+                ],
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+
+impl ScalarUDFImpl for ExtractFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "extract"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        internal_err!("return_field_from_args should be called instead")
+    }
+
+    fn return_field_from_args(&self, args: ReturnFieldArgs) -> 
Result<FieldRef> {
+        let [field, _] = take_function_args(self.name(), 
args.scalar_arguments)?;
+
+        field
+            .and_then(|sv| {
+                sv.try_as_str()
+                    .flatten()
+                    .filter(|s| !s.is_empty())
+                    .map(|part| {
+                        if is_epoch(part) {
+                            Field::new(self.name(), DataType::Float64, true)
+                        } else {
+                            Field::new(self.name(), DataType::Int32, true)
+                        }
+                    })
+            })
+            .map(Arc::new)
+            .map_or_else(
+                || exec_err!("{} requires non-empty constant string", 
self.name()),
+                Ok,
+            )
+    }
+
+    fn invoke_with_args(
+        &self,
+        args: datafusion_expr::ScalarFunctionArgs,
+    ) -> Result<ColumnarValue> {
+        let config = &args.config_options;
+        let args = args.args;
+        let [part, array] = take_function_args(self.name(), args)?;
+
+        let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = 
part {
+            v
+        } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = 
part {
+            v
+        } else {
+            return exec_err!("First argument of `EXTRACT` must be non-null 
scalar Utf8");
+        };
+
+        let is_scalar = matches!(array, ColumnarValue::Scalar(_));
+
+        let array = match array {
+            ColumnarValue::Array(array) => Arc::clone(&array),
+            ColumnarValue::Scalar(scalar) => scalar.to_array()?,
+        };
+
+        let (is_timezone_aware, tz_str_opt) = match array.data_type() {
+            Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))),
+            _ => (false, None),
+        };
+
+        // Adjust timestamps for extraction
+        let array = if is_timezone_aware {
+            // For timezone-aware timestamps, extract in their own timezone
+            let tz_str = tz_str_opt.as_ref().unwrap();
+            let tz = match tz_str.parse::<Tz>() {
+                Ok(tz) => tz,
+                Err(_) => return exec_err!("Invalid timezone"),
+            };
+            match array.data_type() {
+                Timestamp(time_unit, _) => match time_unit {
+                    Nanosecond => {
+                        
adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)?
+                    }
+                    Microsecond => {
+                        
adjust_timestamp_array::<TimestampMicrosecondType>(&array, tz)?
+                    }
+                    Millisecond => {
+                        
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
+                    }
+                    Second => 
adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
+                },
+                _ => array,
+            }
+        } else if let Timestamp(time_unit, None) = array.data_type() {
+            // For naive timestamps, interpret in session timezone
+            let tz = match config.execution.time_zone.parse::<Tz>() {
+                Ok(tz) => tz,
+                Err(_) => return exec_err!("Invalid timezone"),
+            };
+            match time_unit {
+                Nanosecond => {
+                    adjust_timestamp_array::<TimestampNanosecondType>(&array, 
tz)?
+                }
+                Microsecond => {
+                    adjust_timestamp_array::<TimestampMicrosecondType>(&array, 
tz)?
+                }
+                Millisecond => {
+                    adjust_timestamp_array::<TimestampMillisecondType>(&array, 
tz)?
+                }
+                Second => 
adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
+            }
+        } else {
+            array
+        };
+
+        let part_trim = part_normalization(&part);
+
+        // using IntervalUnit here means we hand off all the work of 
supporting plurals (like "seconds")
+        // and synonyms ( like "ms,msec,msecond,millisecond") to Arrow
+        let mut arr = if let Ok(interval_unit) = 
IntervalUnit::from_str(part_trim) {
+            match interval_unit {
+                IntervalUnit::Year => date_part(array.as_ref(), 
DatePart::Year)?,
+                IntervalUnit::Month => date_part(array.as_ref(), 
DatePart::Month)?,
+                IntervalUnit::Week => date_part(array.as_ref(), 
DatePart::Week)?,
+                IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?,
+                IntervalUnit::Hour => date_part(array.as_ref(), 
DatePart::Hour)?,
+                IntervalUnit::Minute => date_part(array.as_ref(), 
DatePart::Minute)?,
+                IntervalUnit::Second => seconds_as_i32(array.as_ref(), 
Second)?,
+                IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), 
Millisecond)?,
+                IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), 
Microsecond)?,
+                IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), 
Nanosecond)?,
+                // century and decade are not supported by `DatePart`, 
although they are supported in postgres
+                _ => return exec_err!("Date part '{part}' not supported"),
+            }
+        } else {
+            // special cases that can be extracted (in postgres) but are not 
interval units
+            match part_trim.to_lowercase().as_str() {
+                "qtr" | "quarter" => date_part(array.as_ref(), 
DatePart::Quarter)?,
+                "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
+                "dow" => date_part(array.as_ref(), 
DatePart::DayOfWeekSunday0)?,
+                "isodow" => date_part(array.as_ref(), 
DatePart::DayOfWeekMonday0)?,
+                "epoch" => epoch(array.as_ref())?,
+                _ => return exec_err!("Date part '{part}' not supported"),
+            }
+        };
+
+        // Special adjustment for hour extraction on timezone-aware timestamps
+        if is_timezone_aware && part_trim.to_lowercase() == "hour" {
+            if let Some(tz_str) = &tz_str_opt {
+                let offset_hours = if tz_str.as_ref() == "+00:00" {
+                    0
+                } else {
+                    let sign = if tz_str.starts_with('+') { 1i32 } else { 
-1i32 };
+                    let hours_str = &tz_str[1..3];
+                    let hours: i32 = hours_str.parse().unwrap();

Review Comment:
   ```suggestion
                       let hours: i32 = hours_str.parse()?;
   ```
   don't panic on invalid user input



##########
datafusion/functions/src/datetime/date_part.rs:
##########
@@ -193,6 +201,56 @@ impl ScalarUDFImpl for DatePartFunc {
             ColumnarValue::Scalar(scalar) => scalar.to_array()?,
         };
 
+        let (is_timezone_aware, tz_str_opt) = match array.data_type() {
+            Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))),
+            _ => (false, None),
+        };
+
+        // Adjust timestamps for extraction
+        let array = if is_timezone_aware {

Review Comment:
   The adjustment should not be done for `epoch`



##########
datafusion/functions/src/datetime/extract.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow::array::timezone::Tz;
+use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, 
PrimitiveBuilder};
+use arrow::compute::kernels::cast_utils::IntervalUnit;
+use arrow::compute::{binary, date_part, DatePart};
+use arrow::datatypes::DataType::{
+    Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
+};
+use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
+use arrow::datatypes::{
+    ArrowTimestampType, DataType, Field, FieldRef, Int32Type, TimeUnit,
+    TimestampMicrosecondType, TimestampMillisecondType, 
TimestampNanosecondType,
+    TimestampSecondType,
+};
+use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
+use datafusion_common::cast::as_primitive_array;
+use datafusion_common::types::{logical_date, NativeType};
+use std::ops::Add;
+
+use datafusion_common::{
+    cast::{
+        as_date32_array, as_date64_array, as_int32_array, 
as_time32_millisecond_array,
+        as_time32_second_array, as_time64_microsecond_array, 
as_time64_nanosecond_array,
+        as_timestamp_microsecond_array, as_timestamp_millisecond_array,
+        as_timestamp_nanosecond_array, as_timestamp_second_array,
+    },
+    exec_err, internal_datafusion_err, internal_err, not_impl_err,
+    types::logical_string,
+    utils::take_function_args,
+    Result, ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature,
+    TypeSignature, Volatility,
+};
+use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
+use datafusion_macros::user_doc;
+
+#[user_doc(
+    doc_section(label = "Time and Date Functions"),
+    description = "Returns the specified part of the date as an integer.",
+    syntax_example = "extract(field FROM source)",
+    argument(
+        name = "field",
+        description = r#"Part of the date to return. The following date parts 
are supported:
+
+- year
+- quarter (emits value in inclusive range [1, 4] based on which quartile of 
the year the date is in)
+- month
+- week (week of the year)
+- day (day of the month)
+- hour
+- minute
+- second
+- millisecond
+- microsecond
+- nanosecond
+- dow (day of the week where Sunday is 0)
+- doy (day of the year)
+- epoch (seconds since Unix epoch)
+- isodow (day of the week where Monday is 0)
+"#
+    ),
+    argument(
+        name = "source",
+        description = "Time expression to operate on. Can be a constant, 
column, or function."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct ExtractFunc {
+    signature: Signature,
+}
+
+impl Default for ExtractFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ExtractFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::one_of(
+                vec![
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_implicit(
+                            TypeSignatureClass::Timestamp,
+                            // Not consistent with Postgres and DuckDB but to 
avoid regression we implicit cast string to timestamp
+                            vec![TypeSignatureClass::Native(logical_string())],
+                            NativeType::Timestamp(Nanosecond, None),
+                        ),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Time),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Interval),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Duration),
+                    ]),
+                ],
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+
+impl ScalarUDFImpl for ExtractFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "extract"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        internal_err!("return_field_from_args should be called instead")
+    }
+
+    fn return_field_from_args(&self, args: ReturnFieldArgs) -> 
Result<FieldRef> {
+        let [field, _] = take_function_args(self.name(), 
args.scalar_arguments)?;
+
+        field
+            .and_then(|sv| {
+                sv.try_as_str()
+                    .flatten()
+                    .filter(|s| !s.is_empty())
+                    .map(|part| {
+                        if is_epoch(part) {
+                            Field::new(self.name(), DataType::Float64, true)
+                        } else {
+                            Field::new(self.name(), DataType::Int32, true)
+                        }
+                    })
+            })
+            .map(Arc::new)
+            .map_or_else(
+                || exec_err!("{} requires non-empty constant string", 
self.name()),
+                Ok,
+            )
+    }
+
+    fn invoke_with_args(
+        &self,
+        args: datafusion_expr::ScalarFunctionArgs,
+    ) -> Result<ColumnarValue> {
+        let config = &args.config_options;
+        let args = args.args;
+        let [part, array] = take_function_args(self.name(), args)?;
+
+        let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = 
part {
+            v
+        } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = 
part {
+            v
+        } else {
+            return exec_err!("First argument of `EXTRACT` must be non-null 
scalar Utf8");
+        };
+
+        let is_scalar = matches!(array, ColumnarValue::Scalar(_));
+
+        let array = match array {
+            ColumnarValue::Array(array) => Arc::clone(&array),
+            ColumnarValue::Scalar(scalar) => scalar.to_array()?,
+        };
+
+        let (is_timezone_aware, tz_str_opt) = match array.data_type() {
+            Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))),
+            _ => (false, None),
+        };
+
+        // Adjust timestamps for extraction
+        let array = if is_timezone_aware {
+            // For timezone-aware timestamps, extract in their own timezone
+            let tz_str = tz_str_opt.as_ref().unwrap();
+            let tz = match tz_str.parse::<Tz>() {
+                Ok(tz) => tz,
+                Err(_) => return exec_err!("Invalid timezone"),
+            };
+            match array.data_type() {
+                Timestamp(time_unit, _) => match time_unit {
+                    Nanosecond => {
+                        
adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)?
+                    }
+                    Microsecond => {
+                        
adjust_timestamp_array::<TimestampMicrosecondType>(&array, tz)?
+                    }
+                    Millisecond => {
+                        
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
+                    }
+                    Second => 
adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
+                },
+                _ => array,
+            }
+        } else if let Timestamp(time_unit, None) = array.data_type() {
+            // For naive timestamps, interpret in session timezone
+            let tz = match config.execution.time_zone.parse::<Tz>() {
+                Ok(tz) => tz,
+                Err(_) => return exec_err!("Invalid timezone"),

Review Comment:
   ```suggestion
                   Err(_) => return exec_err!("Invalid timezone in config: {}", 
config.execution.time_zone),
   ```



##########
datafusion/functions/src/datetime/extract.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow::array::timezone::Tz;
+use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, 
PrimitiveBuilder};
+use arrow::compute::kernels::cast_utils::IntervalUnit;
+use arrow::compute::{binary, date_part, DatePart};
+use arrow::datatypes::DataType::{
+    Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
+};
+use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
+use arrow::datatypes::{
+    ArrowTimestampType, DataType, Field, FieldRef, Int32Type, TimeUnit,
+    TimestampMicrosecondType, TimestampMillisecondType, 
TimestampNanosecondType,
+    TimestampSecondType,
+};
+use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
+use datafusion_common::cast::as_primitive_array;
+use datafusion_common::types::{logical_date, NativeType};
+use std::ops::Add;
+
+use datafusion_common::{
+    cast::{
+        as_date32_array, as_date64_array, as_int32_array, 
as_time32_millisecond_array,
+        as_time32_second_array, as_time64_microsecond_array, 
as_time64_nanosecond_array,
+        as_timestamp_microsecond_array, as_timestamp_millisecond_array,
+        as_timestamp_nanosecond_array, as_timestamp_second_array,
+    },
+    exec_err, internal_datafusion_err, internal_err, not_impl_err,
+    types::logical_string,
+    utils::take_function_args,
+    Result, ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature,
+    TypeSignature, Volatility,
+};
+use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
+use datafusion_macros::user_doc;
+
+#[user_doc(
+    doc_section(label = "Time and Date Functions"),
+    description = "Returns the specified part of the date as an integer.",
+    syntax_example = "extract(field FROM source)",
+    argument(
+        name = "field",
+        description = r#"Part of the date to return. The following date parts 
are supported:
+
+- year
+- quarter (emits value in inclusive range [1, 4] based on which quartile of 
the year the date is in)
+- month
+- week (week of the year)
+- day (day of the month)
+- hour
+- minute
+- second
+- millisecond
+- microsecond
+- nanosecond
+- dow (day of the week where Sunday is 0)
+- doy (day of the year)
+- epoch (seconds since Unix epoch)
+- isodow (day of the week where Monday is 0)
+"#
+    ),
+    argument(
+        name = "source",
+        description = "Time expression to operate on. Can be a constant, 
column, or function."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct ExtractFunc {
+    signature: Signature,
+}
+
+impl Default for ExtractFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ExtractFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::one_of(
+                vec![
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_implicit(
+                            TypeSignatureClass::Timestamp,
+                            // Not consistent with Postgres and DuckDB but to 
avoid regression we implicit cast string to timestamp
+                            vec![TypeSignatureClass::Native(logical_string())],
+                            NativeType::Timestamp(Nanosecond, None),
+                        ),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Time),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Interval),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Duration),
+                    ]),
+                ],
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+
+impl ScalarUDFImpl for ExtractFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "extract"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        internal_err!("return_field_from_args should be called instead")
+    }
+
+    fn return_field_from_args(&self, args: ReturnFieldArgs) -> 
Result<FieldRef> {
+        let [field, _] = take_function_args(self.name(), 
args.scalar_arguments)?;
+
+        field
+            .and_then(|sv| {
+                sv.try_as_str()
+                    .flatten()
+                    .filter(|s| !s.is_empty())
+                    .map(|part| {
+                        if is_epoch(part) {
+                            Field::new(self.name(), DataType::Float64, true)
+                        } else {
+                            Field::new(self.name(), DataType::Int32, true)
+                        }
+                    })
+            })
+            .map(Arc::new)
+            .map_or_else(
+                || exec_err!("{} requires non-empty constant string", 
self.name()),
+                Ok,
+            )
+    }
+
+    fn invoke_with_args(
+        &self,
+        args: datafusion_expr::ScalarFunctionArgs,
+    ) -> Result<ColumnarValue> {
+        let config = &args.config_options;
+        let args = args.args;
+        let [part, array] = take_function_args(self.name(), args)?;
+
+        let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = 
part {
+            v
+        } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = 
part {
+            v
+        } else {
+            return exec_err!("First argument of `EXTRACT` must be non-null 
scalar Utf8");
+        };
+
+        let is_scalar = matches!(array, ColumnarValue::Scalar(_));
+
+        let array = match array {
+            ColumnarValue::Array(array) => Arc::clone(&array),
+            ColumnarValue::Scalar(scalar) => scalar.to_array()?,
+        };
+
+        let (is_timezone_aware, tz_str_opt) = match array.data_type() {
+            Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))),
+            _ => (false, None),
+        };
+
+        // Adjust timestamps for extraction
+        let array = if is_timezone_aware {
+            // For timezone-aware timestamps, extract in their own timezone
+            let tz_str = tz_str_opt.as_ref().unwrap();
+            let tz = match tz_str.parse::<Tz>() {
+                Ok(tz) => tz,
+                Err(_) => return exec_err!("Invalid timezone"),

Review Comment:
   ```suggestion
                   Err(_) => return exec_err!("Invalid timezone: {tz_str}"),
   ```



##########
datafusion/functions/src/datetime/extract.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow::array::timezone::Tz;
+use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, 
PrimitiveBuilder};
+use arrow::compute::kernels::cast_utils::IntervalUnit;
+use arrow::compute::{binary, date_part, DatePart};
+use arrow::datatypes::DataType::{
+    Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
+};
+use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
+use arrow::datatypes::{
+    ArrowTimestampType, DataType, Field, FieldRef, Int32Type, TimeUnit,
+    TimestampMicrosecondType, TimestampMillisecondType, 
TimestampNanosecondType,
+    TimestampSecondType,
+};
+use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
+use datafusion_common::cast::as_primitive_array;
+use datafusion_common::types::{logical_date, NativeType};
+use std::ops::Add;
+
+use datafusion_common::{
+    cast::{
+        as_date32_array, as_date64_array, as_int32_array, 
as_time32_millisecond_array,
+        as_time32_second_array, as_time64_microsecond_array, 
as_time64_nanosecond_array,
+        as_timestamp_microsecond_array, as_timestamp_millisecond_array,
+        as_timestamp_nanosecond_array, as_timestamp_second_array,
+    },
+    exec_err, internal_datafusion_err, internal_err, not_impl_err,
+    types::logical_string,
+    utils::take_function_args,
+    Result, ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature,
+    TypeSignature, Volatility,
+};
+use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
+use datafusion_macros::user_doc;
+
+#[user_doc(
+    doc_section(label = "Time and Date Functions"),
+    description = "Returns the specified part of the date as an integer.",
+    syntax_example = "extract(field FROM source)",
+    argument(
+        name = "field",
+        description = r#"Part of the date to return. The following date parts 
are supported:
+
+- year
+- quarter (emits value in inclusive range [1, 4] based on which quartile of 
the year the date is in)
+- month
+- week (week of the year)
+- day (day of the month)
+- hour
+- minute
+- second
+- millisecond
+- microsecond
+- nanosecond
+- dow (day of the week where Sunday is 0)
+- doy (day of the year)
+- epoch (seconds since Unix epoch)
+- isodow (day of the week where Monday is 0)
+"#
+    ),
+    argument(
+        name = "source",
+        description = "Time expression to operate on. Can be a constant, 
column, or function."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct ExtractFunc {
+    signature: Signature,
+}
+
+impl Default for ExtractFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ExtractFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::one_of(
+                vec![
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_implicit(
+                            TypeSignatureClass::Timestamp,
+                            // Not consistent with Postgres and DuckDB but to 
avoid regression we implicit cast string to timestamp
+                            vec![TypeSignatureClass::Native(logical_string())],
+                            NativeType::Timestamp(Nanosecond, None),
+                        ),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Time),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Interval),
+                    ]),
+                    TypeSignature::Coercible(vec![
+                        
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+                        Coercion::new_exact(TypeSignatureClass::Duration),
+                    ]),
+                ],
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+
+impl ScalarUDFImpl for ExtractFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "extract"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        internal_err!("return_field_from_args should be called instead")
+    }
+
+    fn return_field_from_args(&self, args: ReturnFieldArgs) -> 
Result<FieldRef> {
+        let [field, _] = take_function_args(self.name(), 
args.scalar_arguments)?;
+
+        field
+            .and_then(|sv| {
+                sv.try_as_str()
+                    .flatten()
+                    .filter(|s| !s.is_empty())
+                    .map(|part| {
+                        if is_epoch(part) {
+                            Field::new(self.name(), DataType::Float64, true)
+                        } else {
+                            Field::new(self.name(), DataType::Int32, true)
+                        }
+                    })
+            })
+            .map(Arc::new)
+            .map_or_else(
+                || exec_err!("{} requires non-empty constant string", 
self.name()),
+                Ok,
+            )
+    }
+
+    fn invoke_with_args(
+        &self,
+        args: datafusion_expr::ScalarFunctionArgs,
+    ) -> Result<ColumnarValue> {
+        let config = &args.config_options;
+        let args = args.args;
+        let [part, array] = take_function_args(self.name(), args)?;
+
+        let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = 
part {
+            v
+        } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = 
part {
+            v
+        } else {
+            return exec_err!("First argument of `EXTRACT` must be non-null 
scalar Utf8");
+        };
+
+        let is_scalar = matches!(array, ColumnarValue::Scalar(_));
+
+        let array = match array {
+            ColumnarValue::Array(array) => Arc::clone(&array),
+            ColumnarValue::Scalar(scalar) => scalar.to_array()?,
+        };
+
+        let (is_timezone_aware, tz_str_opt) = match array.data_type() {
+            Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))),
+            _ => (false, None),
+        };
+
+        // Adjust timestamps for extraction
+        let array = if is_timezone_aware {
+            // For timezone-aware timestamps, extract in their own timezone
+            let tz_str = tz_str_opt.as_ref().unwrap();
+            let tz = match tz_str.parse::<Tz>() {
+                Ok(tz) => tz,
+                Err(_) => return exec_err!("Invalid timezone"),
+            };
+            match array.data_type() {
+                Timestamp(time_unit, _) => match time_unit {
+                    Nanosecond => {
+                        
adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)?
+                    }
+                    Microsecond => {
+                        
adjust_timestamp_array::<TimestampMicrosecondType>(&array, tz)?
+                    }
+                    Millisecond => {
+                        
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
+                    }
+                    Second => 
adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
+                },
+                _ => array,
+            }
+        } else if let Timestamp(time_unit, None) = array.data_type() {
+            // For naive timestamps, interpret in session timezone
+            let tz = match config.execution.time_zone.parse::<Tz>() {
+                Ok(tz) => tz,
+                Err(_) => return exec_err!("Invalid timezone"),
+            };
+            match time_unit {
+                Nanosecond => {
+                    adjust_timestamp_array::<TimestampNanosecondType>(&array, 
tz)?
+                }
+                Microsecond => {
+                    adjust_timestamp_array::<TimestampMicrosecondType>(&array, 
tz)?
+                }
+                Millisecond => {
+                    adjust_timestamp_array::<TimestampMillisecondType>(&array, 
tz)?
+                }
+                Second => 
adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
+            }
+        } else {
+            array
+        };
+
+        let part_trim = part_normalization(&part);
+
+        // using IntervalUnit here means we hand off all the work of 
supporting plurals (like "seconds")
+        // and synonyms ( like "ms,msec,msecond,millisecond") to Arrow
+        let mut arr = if let Ok(interval_unit) = 
IntervalUnit::from_str(part_trim) {
+            match interval_unit {
+                IntervalUnit::Year => date_part(array.as_ref(), 
DatePart::Year)?,
+                IntervalUnit::Month => date_part(array.as_ref(), 
DatePart::Month)?,
+                IntervalUnit::Week => date_part(array.as_ref(), 
DatePart::Week)?,
+                IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?,
+                IntervalUnit::Hour => date_part(array.as_ref(), 
DatePart::Hour)?,
+                IntervalUnit::Minute => date_part(array.as_ref(), 
DatePart::Minute)?,
+                IntervalUnit::Second => seconds_as_i32(array.as_ref(), 
Second)?,
+                IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), 
Millisecond)?,
+                IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), 
Microsecond)?,
+                IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), 
Nanosecond)?,
+                // century and decade are not supported by `DatePart`, 
although they are supported in postgres
+                _ => return exec_err!("Date part '{part}' not supported"),
+            }
+        } else {
+            // special cases that can be extracted (in postgres) but are not 
interval units
+            match part_trim.to_lowercase().as_str() {
+                "qtr" | "quarter" => date_part(array.as_ref(), 
DatePart::Quarter)?,
+                "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
+                "dow" => date_part(array.as_ref(), 
DatePart::DayOfWeekSunday0)?,
+                "isodow" => date_part(array.as_ref(), 
DatePart::DayOfWeekMonday0)?,
+                "epoch" => epoch(array.as_ref())?,

Review Comment:
   epoch is independent of the timezone. 
   IMO you should not do 
https://github.com/apache/datafusion/pull/18417/files#diff-242684458bd101fdc25668efbf108672ab49e5ed53765f4d55a5f7b3f929b72aR214-R244
 for `epoch` seconds



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to