fred1268 opened a new issue, #22614:
URL: https://github.com/apache/datafusion/issues/22614

   ### Describe the bug
   
   ## Problem Statement
   
   In DataFusion, functions are registered into the session state on every 
session creation. All three function categories — scalar, aggregate, and window 
— are collected into fresh `Vec`s and then inserted individually into new 
`HashMap`s. This work is repeated identically for every session, regardless of 
how many sessions are created or how fast they are created.
   
   ### Function counts (at time of writing)
   
   | Category | Count | Source | Config-dependent |
   |---|---|---|---|
   | Scalar UDFs | 115 | `all_default_functions()` — math, string, datetime, 
regex, crypto, unicode, encoding, core | 6 |
   | Aggregate UDFs | 38 | `all_default_aggregate_functions()` | 0 |
   | Window UDFs | 12 | `all_default_window_functions()` | 0 |
   | **Total** | **165** | | **6** |
   
   ### Per-session initialization cost
   
   Each call to `SessionStateBuilder::build()` — which happens on every session 
creation — pays the following costs:
   
   | Operation | Count | Detail |
   |---|---|---|
   | `Arc::clone` | ~220–250 | into Vecs + into HashMaps + alias clones |
   | `HashMap::insert` | ~180+ | primary names + aliases across all 3 maps |
   | `String` allocs | ~180+ | one heap key per HashMap entry |
   | `Vec`/`HashMap` allocs | ~6 | 3 collector Vecs + 3 result HashMaps |
   
   ### The TaskContext double-copy
   
   Before any physical plan is executed, DataFusion calls 
`session_state.task_ctx()`, which creates a `TaskContext` from the session 
state via `From<&SessionState>`. `TaskContext` owns its own independent copies 
of all three function maps, cloned in full from the session state:
   
   ```rust
   // session_state.rs:2288 — full clone of all three HashMaps
   impl From<&SessionState> for TaskContext {
       fn from(state: &SessionState) -> Self {
           TaskContext::new(
               ...,
               state.scalar_functions.clone(),     // full HashMap clone
               state.aggregate_functions.clone(),  // full HashMap clone
               state.window_functions.clone(),     // full HashMap clone
               ...
           )
       }
   }
   ```
   
   This means every query pays the HashMap rebuild cost **twice**: once when 
`build()` constructs the `SessionState`, and once when `task_ctx()` converts it 
to a `TaskContext`.
   
   ### The waste: 6 session-dependent functions out of 165
   
   Only 6 scalar UDFs are actually session-dependent, meaning their return 
*type* changes with session configuration (specifically the timezone setting):
   
   | Function | Reason for session-dependency |
   |---|---|
   | `now` / `current_timestamp` | Return type is `Timestamp(ns, timezone)` — 
timezone is per-session |
   | `to_timestamp` | Same |
   | `to_timestamp_seconds` | Same |
   | `to_timestamp_millis` | Same |
   | `to_timestamp_micros` | Same |
   | `to_timestamp_nanos` | Same |
   
   Aggregate and window functions are **structurally incapable** of being 
session-dependent: their impl traits (`AggregateUDFImpl`, `WindowUDFImpl`) do 
not define a `with_updated_config` method at all. This is not a temporary 
limitation — it is a design boundary.
   
   > ⚠️ **Summary:** every session creation pays to rebuild 165 function 
registries and every query execution pays to clone them again into a 
`TaskContext`, yet only 6 of those 165 functions actually differ between 
sessions.
   
   ### To Reproduce
   
   _No response_
   
   ### Expected behavior
   
   Session creation should not rebuild function registries from scratch on 
every call.
   
   The 165 built-in functions are identical across all sessions. Only 6 scalar 
UDFs (`now`, `to_timestamp`, and 4 variants) carry session-specific state (the 
configured timezone in their return type). The remaining 159 functions never 
change.
   
   **Today — per session creation:**
   
   | Operation | Count |
   |---|---|
   | `Arc::clone` | ~220–250 |
   | `HashMap::insert` | ~180+ |
   | `String` heap allocations | ~180+ |
   | `Vec`/`HashMap` allocations | ~6 |
   
   The same cost is paid a second time when `task_ctx()` converts the 
`SessionState` into a `TaskContext` before each physical plan execution.
   
   **Expected — per session creation:**
   
   | Operation | Count |
   |---|---|
   | `Arc::clone` | 3 (one per shared registry) |
   | `HashMap::insert` | ~8 (dynamic map for the 6 config-dependent functions 
only) |
   | `String` heap allocations | ~8 |
   | `Vec`/`HashMap` allocations | 1 (the small dynamic map) |
   
   The 3 shared registries (`Arc<HashMap>`) are built once at process start and 
reused across all sessions. `TaskContext` creation also becomes 3 `Arc::clone`s 
instead of a full HashMap clone.
   
   
   ### Additional context
   
   ## Proposed Design
   
   The idea is to split function initialization into two phases: a **system 
initialization** that runs once per process, and a **session initialization** 
that is reduced to a handful of cheap pointer copies.
   
   | System initialization (once) | Session initialization (per session) |
   |---|---|
   | Build and freeze shared, immutable registries: | 3 `Arc::clone`s — one per 
shared map. |
   | — all config-independent scalar UDFs → `Arc<HashMap>` | Call 
`with_updated_config(session_config)` on each prototype → produce 6 per-session 
scalar instances → insert into a small per-session `HashMap`. |
   | — all aggregate UDFs → `Arc<HashMap>` | `TaskContext` creation: 3 more 
`Arc::clone`s + clone of the small dynamic map. |
   | — all window UDFs → `Arc<HashMap>` | |
   | — prototype instances of the 6 config-dependent scalars → 
`Vec<Arc<ScalarUDF>>` | |
   
   ### The routing signal
   
   The `ScalarUDFImpl::with_updated_config` method already exists and already 
expresses exactly the config-dependency concept. The `build()` loop already 
calls it on every scalar UDF. No new trait method is needed: if 
`with_updated_config` returns `Some`, the function is config-dependent and 
routes to the per-session dynamic map; if it returns `None`, it routes to the 
shared static map.
   
   ```rust
   // Routing logic inside build()
   for udf in scalar_functions_vec {
       match udf.inner().with_updated_config(config_options) {
           Some(session_udf) => dynamic_map.insert(..., session_udf),   // 
config-dependent
           None             => static_map.insert(..., udf),              // 
config-independent
       }
   }
   let static_arc = Arc::new(static_map);  // freeze once build() is done
   ```
   
   ### The two scalar data structures
   
   | Field | Type | Contents | Lifetime |
   |---|---|---|---|
   | `scalar_functions_static` | `Arc<HashMap<String, Arc<ScalarUDF>>>` | All 
config-independent scalar UDFs (109 built-in defaults + any user-registered 
static UDFs). Never written after construction. | Process-wide shared |
   | `scalar_functions_dynamic` | `HashMap<String, Arc<ScalarUDF>>` | 
Config-dependent scalar UDFs only. Today: 6 entries (`now`, `to_timestamp`, and 
4 variants). Per-session, mutated on `SET`/`RESET`. | Per session |
   
   Aggregate and window functions require no split: they are entirely 
config-independent by design. Each becomes a single shared `Arc<HashMap>` with 
no dynamic counterpart.
   
   ### Function lookup
   
   `FunctionRegistry::udf(name)` probes the static map first (hits ~98% of 
lookups), then falls back to the dynamic map:
   
   ```rust
   fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
       self.scalar_functions_static
           .get(name)
           .or_else(|| self.scalar_functions_dynamic.get(name))
           .cloned()
           .ok_or_else(|| plan_datafusion_err!(...))
   }
   ```
   
   ---
   
   ## Initialization
   
   ### System initialization
   
   #### Before
   
   No system initialization exists today. Everything is deferred to session 
creation time. The `LazyLock` singletons for individual function objects are 
initialized on first use, but the *containers* (Vecs and HashMaps) that hold 
them are always built fresh per session.
   
   #### After
   
   Three `LazyLock`-backed static registries are introduced in 
`SessionStateDefaults`, built once on first access and shared across all 
sessions:
   
   ```rust
   // New statics inside SessionStateDefaults
   static DEFAULT_SCALAR_REGISTRY: LazyLock<Arc<HashMap<String, 
Arc<ScalarUDF>>>> =
       LazyLock::new(|| build_static_scalar_map());  // NEW private fn: 
iterates all_default_functions(),
                                                      //   filters out 
config-dependent ones, builds HashMap
   
   static DEFAULT_AGGREGATE_REGISTRY: LazyLock<Arc<HashMap<String, 
Arc<AggregateUDF>>>> =
       LazyLock::new(|| build_aggregate_map());  // NEW private fn: wraps 
all_default_aggregate_functions() into a HashMap
   
   static DEFAULT_WINDOW_REGISTRY: LazyLock<Arc<HashMap<String, 
Arc<WindowUDF>>>> =
       LazyLock::new(|| build_window_map());  // NEW private fn: wraps 
all_default_window_functions() into a HashMap
   
   // Public accessors — each call is one Arc::clone
   pub fn default_scalar_registry() -> Arc<HashMap<String, Arc<ScalarUDF>>> {
       Arc::clone(&DEFAULT_SCALAR_REGISTRY)
   }
   // default_aggregate_registry() and default_window_registry() follow the 
same pattern
   ```
   
   A separate `Vec<Arc<ScalarUDF>>` of prototype instances is also kept for the 
6 config-dependent scalar functions (built once with 
`ConfigOptions::default()`). These are used at session creation time to produce 
per-session instances cheaply via `with_updated_config`.
   
   Callers who add custom functions to the defaults (see Scenarios 2 and 3) 
perform their one-time setup here: clone the relevant inner `HashMap`, insert 
their functions, and freeze the result into their own `Arc`.
   
   ### Session initialization
   
   #### Before
   
   `with_default_features()` calls 
`SessionStateDefaults::default_scalar_functions()`, 
`default_aggregate_functions()`, and `default_window_functions()` — three 
uncached functions that allocate fresh `Vec`s and perform ~165 `Arc::clone`s on 
every call. `build()` then walks those Vecs, calls `with_updated_config` on 
every scalar UDF (~115 calls), and inserts every function into fresh `HashMap`s 
with heap-allocated `String` keys.
   
   Additionally, `task_ctx()` — called once per physical plan execution — 
clones all three `HashMap`s in full into a new `TaskContext`, doubling the 
allocation cost.
   
   #### After
   
   `with_default_features()` internally calls `default_scalar_registry()`, 
`default_aggregate_registry()`, and `default_window_registry()`, assigning the 
resulting `Arc`s directly to the builder's static fields. The Vec collection 
and HashMap construction loops are skipped entirely for config-independent 
functions. The `build()` loop only runs over the 6 prototype entries to produce 
the per-session dynamic map.
   
   `task_ctx()` now performs 3 `Arc::clone`s for the static maps and a small 
`HashMap::clone` of ~6–8 entries for the dynamic map — instead of cloning three 
full HashMaps.
   
   ---
   
   ## Scenarios
   
   ### Scenario 1 — Standard: no custom functions
   
   A caller uses only DataFusion's built-in functions. No startup step is 
needed in either version. The caller code is identical before and after — the 
optimisation is entirely internal to `with_default_features()`.
   
   <table>
   <tr>
   <th>Before — per session</th>
   <th>After — per session</th>
   </tr>
   <tr>
   <td>
   
   - 3 `Vec` allocations
   - ~165 `Arc::clone` into Vecs
   - 3 `HashMap` allocations
   - ~180+ `HashMap::insert` (names + aliases)
   - ~180+ `String` key allocations
   - ~115 `with_updated_config` calls
   - ❌ + TaskContext: 3 more HashMap clones
   
   </td>
   <td>
   
   - 3 `Arc::clone` (static maps)
   - 6 new `ScalarUDF` instances (prototypes)
   - 1 small `HashMap` allocation (~8 entries)
   - ~8 `String` key allocations
   - 6 `with_updated_config` calls
   - ✅ + TaskContext: 3 `Arc::clone` + 1 small map clone
   
   </td>
   </tr>
   </table>
   
   **Before & After — per session**
   
   ```rust
   // ── Startup 
──────────────────────────────────────────────────────────────────
   // (nothing in both versions)
   
   // ── Per session — BEFORE 
─────────────────────────────────────────────────────
   let state = SessionStateBuilder::new()
       .with_default_features()   // ~165 Arc::clones + fresh Vecs + HashMaps
       .with_config(config)
       .build();
   
   // ── Per session — AFTER (caller code identical) 
───────────────────────────────
   let state = SessionStateBuilder::new()
       .with_default_features()   // now: 3 Arc::clones + 6 dynamic instances
       .with_config(config)
       .build();
   ```
   
   ---
   
   ### Scenario 2 — Custom immutable functions
   
   A caller registers one custom config-independent scalar UDF 
(`my_scalar_udf`) and one custom aggregate UDAF (`my_aggregate_udaf`). In the 
new design these are merged into the shared registry once at startup so that 
per-session cost is identical to Scenario 1.
   
   <table>
   <tr>
   <th>Before — per session</th>
   <th>After</th>
   </tr>
   <tr>
   <td>
   
   - Same as Scenario 1 before
   - +1 `Arc::clone` (custom UDF into Vec)
   - +1 `Arc::clone` (custom UDAF into Vec)
   - +2 `HashMap::insert`
   - +2 `String` allocs
   - ❌ All repeated on every session
   
   </td>
   <td>
   
   **Startup (once):** clone inner HashMaps, insert custom functions, freeze 
into new `Arc`s
   
   **Per session:** identical to Scenario 1 after — custom functions already in 
shared maps
   
   </td>
   </tr>
   </table>
   
   **Before**
   
   ```rust
   // ── Startup 
──────────────────────────────────────────────────────────────────
   // (nothing)
   
   // ── Per session 
──────────────────────────────────────────────────────────────
   let mut builder = SessionStateBuilder::new()
       .with_default_features()
       .with_config(config);
   builder.scalar_functions().get_or_insert_default()
       .push(my_scalar_udf());      // appended to the ~115 DF defaults
   builder.aggregate_functions().get_or_insert_default()
       .push(my_aggregate_udaf());  // appended to the ~38 DF defaults
   let state = builder.build();    // ~167 scalar + ~39 aggregate HashMap 
inserts
   ```
   
   **After**
   
   ```rust
   // ── Startup (once) 
────────────────────────────────────────────────────────────
   let mut scalar_functions = 
SessionStateDefaults::default_scalar_registry().as_ref().clone();
   register_into_map(&mut scalar_functions, my_scalar_udf());  // None from 
with_updated_config
   let scalar_functions = Arc::new(scalar_functions);  // shadow: HashMap → Arc 
(frozen)
   
   let mut aggregate_functions = 
SessionStateDefaults::default_aggregate_registry().as_ref().clone();
   register_into_map(&mut aggregate_functions, my_aggregate_udaf());
   let aggregate_functions = Arc::new(aggregate_functions);  // shadow: HashMap 
→ Arc (frozen)
   
   let window_functions = SessionStateDefaults::default_window_registry();  // 
no custom window UDFs
   
   // ── Per session 
───────────────────────────────────────────────────────────────
   // 3 Arc::clones — all 167 functions available, zero HashMap allocs
   let state = SessionStateBuilder::new()
       .with_shared_scalar_functions(Arc::clone(&scalar_functions))
       .with_shared_aggregate_functions(Arc::clone(&aggregate_functions))
       .with_shared_window_functions(Arc::clone(&window_functions))
       .with_config(config)
       .build();
   ```
   
   ---
   
   ### Scenario 3 — Custom mutable (config-dependent) function
   
   A caller provides a custom scalar UDF `format_timestamp` whose return type 
depends on the session locale. It overrides `with_updated_config`. The routing 
signal routes it to the dynamic map automatically. In the new design the 
`build()` loop runs only over this one function, not all ~165 defaults.
   
   <table>
   <tr>
   <th>Before — per session</th>
   <th>After — per session</th>
   </tr>
   <tr>
   <td>
   
   - Same as Scenario 1 before
   - +1 `Arc::clone` (into Vec)
   - +1 new `ScalarUDF` instance (from `with_updated_config`)
   - +1 `HashMap::insert`
   - +1 `String` alloc
   - ❌ `with_updated_config` called on all ~115 scalar UDFs needlessly
   
   </td>
   <td>
   
   - Same as Scenario 1 after
   - +1 new `ScalarUDF` instance (from prototype)
   - +1 `HashMap::insert`
   - +1 `String` alloc
   - ✅ `with_updated_config` called on 7 functions only (6 built-in + 1 custom)
   
   </td>
   </tr>
   </table>
   
   **Before**
   
   ```rust
   // ── Startup 
──────────────────────────────────────────────────────────────────
   // (nothing)
   
   // ── Per session 
──────────────────────────────────────────────────────────────
   // build() calls with_updated_config on all ~115 scalar UDFs
   let mut builder = SessionStateBuilder::new()
       .with_default_features()
       .with_config(config);
   
builder.scalar_functions().get_or_insert_default().push(format_timestamp_udf());
   let state = builder.build();
   ```
   
   **After**
   
   ```rust
   // ── Startup (once) 
────────────────────────────────────────────────────────────
   let prototype = format_timestamp_udf();  // default-config instance, stored 
on the runtime
   
   // ── Per session 
──────────────────────────────────────────────────────────────
   // with_default_features() uses Arc::clone for static maps;
   // build() loop runs over prototype only — gets Some, routes to dynamic map
   let mut builder = SessionStateBuilder::new()
       .with_default_features()
       .with_config(config);
   
builder.scalar_functions().get_or_insert_default().push(Arc::clone(&prototype));
   let state = builder.build();
   // format_timestamp is in scalar_functions_dynamic, baked with the session 
config
   // SET statement refreshes it via with_updated_config
   ```
   
   ---
   
   ## API Changes
   
   ### Internal struct changes
   
   Private fields on `SessionState` and `TaskContext` change as follows. These 
are not directly part of the public API but drive all downstream changes.
   
   | Struct | Field | Before | After |
   |---|---|---|---|
   | `SessionState` | `scalar_functions` | `HashMap<String, Arc<ScalarUDF>>` | 
split into `scalar_functions_static: Arc<HashMap>` and 
`scalar_functions_dynamic: HashMap` |
   | `SessionState` | `aggregate_functions` | `HashMap<String, 
Arc<AggregateUDF>>` | `Arc<HashMap<String, Arc<AggregateUDF>>>` |
   | `SessionState` | `window_functions` | `HashMap<String, Arc<WindowUDF>>` | 
`Arc<HashMap<String, Arc<WindowUDF>>>` |
   | `TaskContext` | `scalar_functions` | `HashMap<String, Arc<ScalarUDF>>` | 
same split as `SessionState` |
   | `TaskContext` | `aggregate_functions` | `HashMap<String, 
Arc<AggregateUDF>>` | `Arc<HashMap<String, Arc<AggregateUDF>>>` |
   | `TaskContext` | `window_functions` | `HashMap<String, Arc<WindowUDF>>` | 
`Arc<HashMap<String, Arc<WindowUDF>>>` |
   | `SessionStateBuilder` | `build()` scalar loop | routes all scalar UDFs 
into one `HashMap` | routes via `with_updated_config` into static or dynamic 
map; freezes static into `Arc` |
   
   ### New public APIs (additive — no breaking change)
   
   | Location | New symbol | Purpose |
   |---|---|---|
   | `SessionStateDefaults` | `default_scalar_registry() -> Arc<HashMap<String, 
Arc<ScalarUDF>>>` | `LazyLock`-backed process-wide static scalar registry 
(config-independent only). One `Arc::clone` per call. |
   | `SessionStateDefaults` | `default_aggregate_registry() -> 
Arc<HashMap<String, Arc<AggregateUDF>>>` | Same for aggregates. |
   | `SessionStateDefaults` | `default_window_registry() -> Arc<HashMap<String, 
Arc<WindowUDF>>>` | Same for window functions. |
   | `SessionStateBuilder` | `with_shared_scalar_functions(Arc<HashMap>)` | 
Inject a pre-built static scalar registry. When set, `build()` skips the static 
construction loop entirely. |
   | `SessionStateBuilder` | `with_shared_aggregate_functions(Arc<HashMap>)` | 
Same for aggregates. |
   | `SessionStateBuilder` | `with_shared_window_functions(Arc<HashMap>)` | 
Same for window functions. |
   
   ### Breaking public API changes
   
   > ℹ️ **Rationale for bundling all breaking changes in one PR:** the 
mandatory breaks (`scalar_functions()` and `TaskContext::new()`) cannot be 
avoided cleanly. Since the PR is already a breaking change, removing the three 
`default_*_functions()` methods in the same PR avoids a separate deprecation PR 
later and leaves the API in a cleaner final state. The three removals are **not 
mandatory** — deprecation is a valid alternative path — but the cost of 
bundling them now is low given the context.
   
   > ⚠️ **Breaking surface is narrow** — normal query-serving code uses 
`FunctionRegistry::udf(name)`, which is unchanged. Only code that iterates all 
scalar functions, constructs `TaskContext` directly, or calls the 
`default_*_functions()` helpers is affected.
   
   | Location | Symbol | Mandatory? | Change | Migration |
   |---|---|---|---|---|
   | `SessionState` | `scalar_functions() -> &HashMap<…>` | 🔴 Mandatory | 
Cannot return a unified `&HashMap` across static and dynamic maps without 
allocation. Split into `static_scalar_functions()` and 
`dynamic_scalar_functions()`. | Update callers to use the appropriate getter. 
Most callers only need `FunctionRegistry::udf(name)` and are unaffected. |
   | `TaskContext` | `scalar_functions() -> &HashMap<…>` | 🔴 Mandatory | Same 
split as `SessionState`. | Same as above. |
   | `TaskContext` | `TaskContext::new(…)` | 🔴 Mandatory | Scalar parameter 
splits into two; aggregate and window parameters become `Arc<HashMap>`. | 
Update call sites (tests, custom executors) to pass the split parameters. |
   | `SessionStateDefaults` | `default_scalar_functions() -> Vec<…>` | 🟡 Not 
mandatory | Semantically incompatible with the new design: returns 
config-dependent functions that are no longer in the static registry. Removing 
avoids a silent semantic trap for callers who expect all defaults. Replace with 
`default_scalar_registry()`. | Could be deprecated instead of removed. 
Recommended to remove given mandatory breaks already present. |
   | `SessionStateDefaults` | `default_aggregate_functions() -> Vec<…>` | 🟡 Not 
mandatory | Semantically equivalent to `default_aggregate_registry()` but 
returns a `Vec` instead of an `Arc<HashMap>`. Replace with 
`default_aggregate_registry()`. | Could be deprecated instead of removed. 
Recommended to remove for consistency. |
   | `SessionStateDefaults` | `default_window_functions() -> Vec<…>` | 🟡 Not 
mandatory | Same as `default_aggregate_functions()` above. Replace with 
`default_window_registry()`. | Could be deprecated instead of removed. 
Recommended to remove for consistency. |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to