This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b7a10adbc8 fix: Use dynamic timezone in now() function for accurate 
timestamp (#18017)
b7a10adbc8 is described below

commit b7a10adbc8d404f92bb281f08135f3b84fc42084
Author: Alex Huang <[email protected]>
AuthorDate: Wed Oct 22 12:12:49 2025 +0300

    fix: Use dynamic timezone in now() function for accurate timestamp (#18017)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #17993
    
    ## Rationale for this change
    
    ```
    DataFusion CLI v50.1.0
    > SET TIME ZONE = '+08:00';
    0 row(s) fetched.
    Elapsed 0.011 seconds.
    
    > SELECT arrow_typeof(now());
    +---------------------------------------+
    | arrow_typeof(now())                   |
    +---------------------------------------+
    | Timestamp(Nanosecond, Some("+08:00")) |
    +---------------------------------------+
    1 row(s) fetched.
    Elapsed 0.015 seconds.
    
    > SELECT count(1) result FROM (SELECT now() as n) a WHERE n > 
'2000-01-01'::date;
    +--------+
    | result |
    +--------+
    | 1      |
    +--------+
    1 row(s) fetched.
    Elapsed 0.029 seconds.
    ```
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    When the timezone changes, re-register `now()` function
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/execution/context/mod.rs      | 20 +++++++++++++++
 datafusion/core/src/execution/session_state.rs    | 29 +++++++++++++++++----
 datafusion/core/tests/expr_api/simplification.rs  |  3 +--
 datafusion/core/tests/optimizer/mod.rs            |  3 ++-
 datafusion/expr/src/udf.rs                        | 31 +++++++++++++++++++++++
 datafusion/functions/src/datetime/mod.rs          |  8 ++++--
 datafusion/functions/src/datetime/now.rs          | 28 ++++++++++++++++----
 datafusion/functions/src/macros.rs                | 26 +++++++++++++++++++
 datafusion/sqllogictest/test_files/timestamps.slt | 15 +++++++++++
 9 files changed, 148 insertions(+), 15 deletions(-)

diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index eace5610e1..87dc18be5b 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -1079,6 +1079,26 @@ impl SessionContext {
         } else {
             let mut state = self.state.write();
             state.config_mut().options_mut().set(&variable, &value)?;
+
+            // Re-initialize any UDFs that depend on configuration
+            // This allows both built-in and custom functions to respond to 
configuration changes
+            let config_options = state.config().options();
+
+            // Collect updated UDFs in a separate vector
+            let udfs_to_update: Vec<_> = state
+                .scalar_functions()
+                .values()
+                .filter_map(|udf| {
+                    udf.inner()
+                        .with_updated_config(config_options)
+                        .map(Arc::new)
+                })
+                .collect();
+
+            for udf in udfs_to_update {
+                state.register_udf(udf)?;
+            }
+
             drop(state);
         }
 
diff --git a/datafusion/core/src/execution/session_state.rs 
b/datafusion/core/src/execution/session_state.rs
index 6749ddd7ab..b3b336f560 100644
--- a/datafusion/core/src/execution/session_state.rs
+++ b/datafusion/core/src/execution/session_state.rs
@@ -1418,12 +1418,31 @@ impl SessionStateBuilder {
         }
 
         if let Some(scalar_functions) = scalar_functions {
-            scalar_functions.into_iter().for_each(|udf| {
-                let existing_udf = state.register_udf(udf);
-                if let Ok(Some(existing_udf)) = existing_udf {
-                    debug!("Overwrote an existing UDF: {}", 
existing_udf.name());
+            for udf in scalar_functions {
+                let config_options = state.config().options();
+                match udf.inner().with_updated_config(config_options) {
+                    Some(new_udf) => {
+                        if let Err(err) = 
state.register_udf(Arc::new(new_udf)) {
+                            debug!(
+                                "Failed to re-register updated UDF '{}': {}",
+                                udf.name(),
+                                err
+                            );
+                        }
+                    }
+                    None => match state.register_udf(Arc::clone(&udf)) {
+                        Ok(Some(existing)) => {
+                            debug!("Overwrote existing UDF '{}'", 
existing.name());
+                        }
+                        Ok(None) => {
+                            debug!("Registered UDF '{}'", udf.name());
+                        }
+                        Err(err) => {
+                            debug!("Failed to register UDF '{}': {}", 
udf.name(), err);
+                        }
+                    },
                 }
-            });
+            }
         }
 
         if let Some(aggregate_functions) = aggregate_functions {
diff --git a/datafusion/core/tests/expr_api/simplification.rs 
b/datafusion/core/tests/expr_api/simplification.rs
index 89651726a6..572a7e2b33 100644
--- a/datafusion/core/tests/expr_api/simplification.rs
+++ b/datafusion/core/tests/expr_api/simplification.rs
@@ -514,8 +514,7 @@ fn multiple_now() -> Result<()> {
     // expect the same timestamp appears in both exprs
     let actual = get_optimized_plan_formatted(plan, &time);
     let expected = format!(
-        "Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), 
TimestampNanosecond({}, Some(\"+00:00\")) AS t2\
-            \n  TableScan: test",
+        "Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), 
TimestampNanosecond({}, Some(\"+00:00\")) AS t2\n  TableScan: test",
         time.timestamp_nanos_opt().unwrap(),
         time.timestamp_nanos_opt().unwrap()
     );
diff --git a/datafusion/core/tests/optimizer/mod.rs 
b/datafusion/core/tests/optimizer/mod.rs
index 9899a0158f..aec32d0562 100644
--- a/datafusion/core/tests/optimizer/mod.rs
+++ b/datafusion/core/tests/optimizer/mod.rs
@@ -144,8 +144,9 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
     let statement = &ast[0];
 
     // create a logical query plan
+    let config = ConfigOptions::default();
     let context_provider = MyContextProvider::default()
-        .with_udf(datetime::now())
+        .with_udf(datetime::now(&config))
         .with_udf(datafusion_functions::core::arrow_cast())
         .with_udf(datafusion_functions::string::concat())
         .with_udf(datafusion_functions::string::concat_ws());
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index de81ec5f0b..c4cd8c006d 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -546,6 +546,33 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + 
Sync {
     /// [`DataFusionError::Internal`]: 
datafusion_common::DataFusionError::Internal
     fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
 
+    /// Create a new instance of this function with updated configuration.
+    ///
+    /// This method is called when configuration options change at runtime
+    /// (e.g., via `SET` statements) to allow functions that depend on
+    /// configuration to update themselves accordingly.
+    ///
+    /// Note the current [`ConfigOptions`] are also passed to 
[`Self::invoke_with_args`] so
+    /// this API is not needed for functions where the values may
+    /// depend on the current options.
+    ///
+    /// This API is useful for functions where the return
+    /// **type** depends on the configuration options, such as the `now()` 
function
+    /// which depends on the current timezone.
+    ///
+    /// # Arguments
+    ///
+    /// * `config` - The updated configuration options
+    ///
+    /// # Returns
+    ///
+    /// * `Some(ScalarUDF)` - A new instance of this function configured with 
the new settings
+    /// * `None` - If this function does not change with new configuration 
settings (the default)
+    ///
+    fn with_updated_config(&self, _config: &ConfigOptions) -> 
Option<ScalarUDF> {
+        None
+    }
+
     /// What type will be returned by this function, given the arguments?
     ///
     /// By default, this function calls [`Self::return_type`] with the
@@ -879,6 +906,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
         self.inner.invoke_with_args(args)
     }
 
+    fn with_updated_config(&self, _config: &ConfigOptions) -> 
Option<ScalarUDF> {
+        None
+    }
+
     fn aliases(&self) -> &[String] {
         &self.aliases
     }
diff --git a/datafusion/functions/src/datetime/mod.rs 
b/datafusion/functions/src/datetime/mod.rs
index 5729b1edae..d80f14facf 100644
--- a/datafusion/functions/src/datetime/mod.rs
+++ b/datafusion/functions/src/datetime/mod.rs
@@ -45,7 +45,6 @@ make_udf_function!(date_part::DatePartFunc, date_part);
 make_udf_function!(date_trunc::DateTruncFunc, date_trunc);
 make_udf_function!(make_date::MakeDateFunc, make_date);
 make_udf_function!(from_unixtime::FromUnixtimeFunc, from_unixtime);
-make_udf_function!(now::NowFunc, now);
 make_udf_function!(to_char::ToCharFunc, to_char);
 make_udf_function!(to_date::ToDateFunc, to_date);
 make_udf_function!(to_local_time::ToLocalTimeFunc, to_local_time);
@@ -56,6 +55,9 @@ make_udf_function!(to_timestamp::ToTimestampMillisFunc, 
to_timestamp_millis);
 make_udf_function!(to_timestamp::ToTimestampMicrosFunc, to_timestamp_micros);
 make_udf_function!(to_timestamp::ToTimestampNanosFunc, to_timestamp_nanos);
 
+// create UDF with config
+make_udf_function_with_config!(now::NowFunc, now);
+
 // we cannot currently use the export_functions macro since it doesn't handle
 // functions with varargs currently
 
@@ -91,6 +93,7 @@ pub mod expr_fn {
     ),(
         now,
         "returns the current timestamp in nanoseconds, using the same value 
for all instances of now() in same statement",
+        @config
     ),
     (
         to_local_time,
@@ -255,6 +258,7 @@ pub mod expr_fn {
 
 /// Returns all DataFusion functions defined in this package
 pub fn functions() -> Vec<Arc<ScalarUDF>> {
+    use datafusion_common::config::ConfigOptions;
     vec![
         current_date(),
         current_time(),
@@ -263,7 +267,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
         date_trunc(),
         from_unixtime(),
         make_date(),
-        now(),
+        now(&ConfigOptions::default()),
         to_char(),
         to_date(),
         to_local_time(),
diff --git a/datafusion/functions/src/datetime/now.rs 
b/datafusion/functions/src/datetime/now.rs
index 65dadb42a8..96a35c241f 100644
--- a/datafusion/functions/src/datetime/now.rs
+++ b/datafusion/functions/src/datetime/now.rs
@@ -19,12 +19,14 @@ use arrow::datatypes::DataType::Timestamp;
 use arrow::datatypes::TimeUnit::Nanosecond;
 use arrow::datatypes::{DataType, Field, FieldRef};
 use std::any::Any;
+use std::sync::Arc;
 
+use datafusion_common::config::ConfigOptions;
 use datafusion_common::{internal_err, Result, ScalarValue};
 use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
 use datafusion_expr::{
-    ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDFImpl, 
Signature,
-    Volatility,
+    ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDF, 
ScalarUDFImpl,
+    Signature, Volatility,
 };
 use datafusion_macros::user_doc;
 
@@ -41,19 +43,30 @@ The `now()` return value is determined at query time and 
will return the same ti
 pub struct NowFunc {
     signature: Signature,
     aliases: Vec<String>,
+    timezone: Option<Arc<str>>,
 }
 
 impl Default for NowFunc {
     fn default() -> Self {
-        Self::new()
+        Self::new_with_config(&ConfigOptions::default())
     }
 }
 
 impl NowFunc {
+    #[deprecated(since = "50.2.0", note = "use `new_with_config` instead")]
     pub fn new() -> Self {
         Self {
             signature: Signature::nullary(Volatility::Stable),
             aliases: vec!["current_timestamp".to_string()],
+            timezone: Some(Arc::from("+00")),
+        }
+    }
+
+    pub fn new_with_config(config: &ConfigOptions) -> Self {
+        Self {
+            signature: Signature::nullary(Volatility::Stable),
+            aliases: vec!["current_timestamp".to_string()],
+            timezone: Some(Arc::from(config.execution.time_zone.as_str())),
         }
     }
 }
@@ -77,10 +90,14 @@ impl ScalarUDFImpl for NowFunc {
         &self.signature
     }
 
+    fn with_updated_config(&self, config: &ConfigOptions) -> Option<ScalarUDF> 
{
+        Some(Self::new_with_config(config).into())
+    }
+
     fn return_field_from_args(&self, _args: ReturnFieldArgs) -> 
Result<FieldRef> {
         Ok(Field::new(
             self.name(),
-            Timestamp(Nanosecond, Some("+00:00".into())),
+            Timestamp(Nanosecond, self.timezone.clone()),
             false,
         )
         .into())
@@ -106,8 +123,9 @@ impl ScalarUDFImpl for NowFunc {
             .execution_props()
             .query_execution_start_time
             .timestamp_nanos_opt();
+
         Ok(ExprSimplifyResult::Simplified(Expr::Literal(
-            ScalarValue::TimestampNanosecond(now_ts, Some("+00:00".into())),
+            ScalarValue::TimestampNanosecond(now_ts, self.timezone.clone()),
             None,
         )))
     }
diff --git a/datafusion/functions/src/macros.rs 
b/datafusion/functions/src/macros.rs
index 228d704e29..9e195f2d52 100644
--- a/datafusion/functions/src/macros.rs
+++ b/datafusion/functions/src/macros.rs
@@ -40,6 +40,7 @@
 /// Exported functions accept:
 /// - `Vec<Expr>` argument (single argument followed by a comma)
 /// - Variable number of `Expr` arguments (zero or more arguments, must be 
without commas)
+/// - Functions that require config (marked with `@config` prefix)
 #[macro_export]
 macro_rules! export_functions {
     ($(($FUNC:ident, $DOC:expr, $($arg:tt)*)),*) => {
@@ -49,6 +50,15 @@ macro_rules! export_functions {
         )*
     };
 
+    // function that requires config (marked with @config)
+    (single $FUNC:ident, $DOC:expr, @config) => {
+        #[doc = $DOC]
+        pub fn $FUNC() -> datafusion_expr::Expr {
+            use datafusion_common::config::ConfigOptions;
+            super::$FUNC(&ConfigOptions::default()).call(vec![])
+        }
+    };
+
     // single vector argument (a single argument followed by a comma)
     (single $FUNC:ident, $DOC:expr, $arg:ident,) => {
         #[doc = $DOC]
@@ -89,6 +99,22 @@ macro_rules! make_udf_function {
     };
 }
 
+/// Creates a singleton `ScalarUDF` of the `$UDF` function and a function
+/// named `$NAME` which returns that singleton. The function takes a
+/// configuration argument of type `$CONFIG_TYPE` to create the UDF.
+#[macro_export]
+macro_rules! make_udf_function_with_config {
+    ($UDF:ty, $NAME:ident) => {
+        #[allow(rustdoc::redundant_explicit_links)]
+        #[doc = concat!("Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) 
implementation of ", stringify!($NAME))]
+        pub fn $NAME(config: &datafusion_common::config::ConfigOptions) -> 
std::sync::Arc<datafusion_expr::ScalarUDF> {
+            std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
+                <$UDF>::new_with_config(&config),
+            ))
+        }
+    };
+}
+
 /// Macro creates a sub module if the feature is not enabled
 ///
 /// The rationale for providing stub functions is to help users to configure 
datafusion
diff --git a/datafusion/sqllogictest/test_files/timestamps.slt 
b/datafusion/sqllogictest/test_files/timestamps.slt
index ebc79533f1..6fe9995c7b 100644
--- a/datafusion/sqllogictest/test_files/timestamps.slt
+++ b/datafusion/sqllogictest/test_files/timestamps.slt
@@ -73,6 +73,21 @@ true
 ##########
 ## Current time Tests
 ##########
+statement ok
+SET TIME ZONE = '+08'
+
+query T
+select arrow_typeof(now());
+----
+Timestamp(Nanosecond, Some("+08"))
+
+query I
+SELECT count(1) result FROM (SELECT now() as n) a WHERE n > '2000-01-01'::date;
+----
+1
+
+statement ok
+SET TIME ZONE = '+00'
 
 query B
 select cast(now() as time) = current_time();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to