fred1268 opened a new issue, #22614:
URL: https://github.com/apache/datafusion/issues/22614
### Describe the bug
## Problem Statement
In DataFusion, functions are registered into the session state on every
session creation. All three function categories — scalar, aggregate, and window
— are collected into fresh `Vec`s and then inserted individually into new
`HashMap`s. This work is repeated identically for every session, regardless of
how many sessions are created or how fast they are created.
### Function counts (at time of writing)
| Category | Count | Source | Config-dependent |
|---|---|---|---|
| Scalar UDFs | 115 | `all_default_functions()` — math, string, datetime,
regex, crypto, unicode, encoding, core | 6 |
| Aggregate UDFs | 38 | `all_default_aggregate_functions()` | 0 |
| Window UDFs | 12 | `all_default_window_functions()` | 0 |
| **Total** | **165** | | **6** |
### Per-session initialization cost
Each call to `SessionStateBuilder::build()` — which happens on every session
creation — pays the following costs:
| Operation | Count | Detail |
|---|---|---|
| `Arc::clone` | ~220–250 | into Vecs + into HashMaps + alias clones |
| `HashMap::insert` | ~180+ | primary names + aliases across all 3 maps |
| `String` allocs | ~180+ | one heap key per HashMap entry |
| `Vec`/`HashMap` allocs | ~6 | 3 collector Vecs + 3 result HashMaps |
### The TaskContext double-copy
Before any physical plan is executed, DataFusion calls
`session_state.task_ctx()`, which creates a `TaskContext` from the session
state via `From<&SessionState>`. `TaskContext` owns its own independent copies
of all three function maps, cloned in full from the session state:
```rust
// session_state.rs:2288 — full clone of all three HashMaps
impl From<&SessionState> for TaskContext {
fn from(state: &SessionState) -> Self {
TaskContext::new(
...,
state.scalar_functions.clone(), // full HashMap clone
state.aggregate_functions.clone(), // full HashMap clone
state.window_functions.clone(), // full HashMap clone
...
)
}
}
```
This means every query pays the HashMap rebuild cost **twice**: once when
`build()` constructs the `SessionState`, and once when `task_ctx()` converts it
to a `TaskContext`.
### The waste: 6 session-dependent functions out of 165
Only 6 scalar UDFs are actually session-dependent, meaning their return
*type* changes with session configuration (specifically the timezone setting):
| Function | Reason for session-dependency |
|---|---|
| `now` / `current_timestamp` | Return type is `Timestamp(ns, timezone)` —
timezone is per-session |
| `to_timestamp` | Same |
| `to_timestamp_seconds` | Same |
| `to_timestamp_millis` | Same |
| `to_timestamp_micros` | Same |
| `to_timestamp_nanos` | Same |
Aggregate and window functions are **structurally incapable** of being
session-dependent: their impl traits (`AggregateUDFImpl`, `WindowUDFImpl`) do
not define a `with_updated_config` method at all. This is not a temporary
limitation — it is a design boundary.
> ⚠️ **Summary:** every session creation pays to rebuild 165 function
registries and every query execution pays to clone them again into a
`TaskContext`, yet only 6 of those 165 functions actually differ between
sessions.
### To Reproduce
_No response_
### Expected behavior
Session creation should not rebuild function registries from scratch on
every call.
The 165 built-in functions are identical across all sessions. Only 6 scalar
UDFs (`now`, `to_timestamp`, and 4 variants) carry session-specific state (the
configured timezone in their return type). The remaining 159 functions never
change.
**Today — per session creation:**
| Operation | Count |
|---|---|
| `Arc::clone` | ~220–250 |
| `HashMap::insert` | ~180+ |
| `String` heap allocations | ~180+ |
| `Vec`/`HashMap` allocations | ~6 |
The same cost is paid a second time when `task_ctx()` converts the
`SessionState` into a `TaskContext` before each physical plan execution.
**Expected — per session creation:**
| Operation | Count |
|---|---|
| `Arc::clone` | 3 (one per shared registry) |
| `HashMap::insert` | ~8 (dynamic map for the 6 config-dependent functions
only) |
| `String` heap allocations | ~8 |
| `Vec`/`HashMap` allocations | 1 (the small dynamic map) |
The 3 shared registries (`Arc<HashMap>`) are built once at process start and
reused across all sessions. `TaskContext` creation also becomes 3 `Arc::clone`s
instead of a full HashMap clone.
### Additional context
## Proposed Design
The idea is to split function initialization into two phases: a **system
initialization** that runs once per process, and a **session initialization**
that is reduced to a handful of cheap pointer copies.
| System initialization (once) | Session initialization (per session) |
|---|---|
| Build and freeze shared, immutable registries: | 3 `Arc::clone`s — one per
shared map. |
| — all config-independent scalar UDFs → `Arc<HashMap>` | Call
`with_updated_config(session_config)` on each prototype → produce 6 per-session
scalar instances → insert into a small per-session `HashMap`. |
| — all aggregate UDFs → `Arc<HashMap>` | `TaskContext` creation: 3 more
`Arc::clone`s + clone of the small dynamic map. |
| — all window UDFs → `Arc<HashMap>` | |
| — prototype instances of the 6 config-dependent scalars →
`Vec<Arc<ScalarUDF>>` | |
### The routing signal
The `ScalarUDFImpl::with_updated_config` method already exists and already
expresses exactly the config-dependency concept. The `build()` loop already
calls it on every scalar UDF. No new trait method is needed: if
`with_updated_config` returns `Some`, the function is config-dependent and
routes to the per-session dynamic map; if it returns `None`, it routes to the
shared static map.
```rust
// Routing logic inside build()
for udf in scalar_functions_vec {
match udf.inner().with_updated_config(config_options) {
Some(session_udf) => dynamic_map.insert(..., session_udf), //
config-dependent
None => static_map.insert(..., udf), //
config-independent
}
}
let static_arc = Arc::new(static_map); // freeze once build() is done
```
### The two scalar data structures
| Field | Type | Contents | Lifetime |
|---|---|---|---|
| `scalar_functions_static` | `Arc<HashMap<String, Arc<ScalarUDF>>>` | All
config-independent scalar UDFs (109 built-in defaults + any user-registered
static UDFs). Never written after construction. | Process-wide shared |
| `scalar_functions_dynamic` | `HashMap<String, Arc<ScalarUDF>>` |
Config-dependent scalar UDFs only. Today: 6 entries (`now`, `to_timestamp`, and
4 variants). Per-session, mutated on `SET`/`RESET`. | Per session |
Aggregate and window functions require no split: they are entirely
config-independent by design. Each becomes a single shared `Arc<HashMap>` with
no dynamic counterpart.
### Function lookup
`FunctionRegistry::udf(name)` probes the static map first (hits ~98% of
lookups), then falls back to the dynamic map:
```rust
fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
self.scalar_functions_static
.get(name)
.or_else(|| self.scalar_functions_dynamic.get(name))
.cloned()
.ok_or_else(|| plan_datafusion_err!(...))
}
```
---
## Initialization
### System initialization
#### Before
No system initialization exists today. Everything is deferred to session
creation time. The `LazyLock` singletons for individual function objects are
initialized on first use, but the *containers* (Vecs and HashMaps) that hold
them are always built fresh per session.
#### After
Three `LazyLock`-backed static registries are introduced in
`SessionStateDefaults`, built once on first access and shared across all
sessions:
```rust
// New statics inside SessionStateDefaults
static DEFAULT_SCALAR_REGISTRY: LazyLock<Arc<HashMap<String,
Arc<ScalarUDF>>>> =
LazyLock::new(|| build_static_scalar_map()); // NEW private fn:
iterates all_default_functions(),
// filters out
config-dependent ones, builds HashMap
static DEFAULT_AGGREGATE_REGISTRY: LazyLock<Arc<HashMap<String,
Arc<AggregateUDF>>>> =
LazyLock::new(|| build_aggregate_map()); // NEW private fn: wraps
all_default_aggregate_functions() into a HashMap
static DEFAULT_WINDOW_REGISTRY: LazyLock<Arc<HashMap<String,
Arc<WindowUDF>>>> =
LazyLock::new(|| build_window_map()); // NEW private fn: wraps
all_default_window_functions() into a HashMap
// Public accessors — each call is one Arc::clone
pub fn default_scalar_registry() -> Arc<HashMap<String, Arc<ScalarUDF>>> {
Arc::clone(&DEFAULT_SCALAR_REGISTRY)
}
// default_aggregate_registry() and default_window_registry() follow the
same pattern
```
A separate `Vec<Arc<ScalarUDF>>` of prototype instances is also kept for the
6 config-dependent scalar functions (built once with
`ConfigOptions::default()`). These are used at session creation time to produce
per-session instances cheaply via `with_updated_config`.
Callers who add custom functions to the defaults (see Scenarios 2 and 3)
perform their one-time setup here: clone the relevant inner `HashMap`, insert
their functions, and freeze the result into their own `Arc`.
### Session initialization
#### Before
`with_default_features()` calls
`SessionStateDefaults::default_scalar_functions()`,
`default_aggregate_functions()`, and `default_window_functions()` — three
uncached functions that allocate fresh `Vec`s and perform ~165 `Arc::clone`s on
every call. `build()` then walks those Vecs, calls `with_updated_config` on
every scalar UDF (~115 calls), and inserts every function into fresh `HashMap`s
with heap-allocated `String` keys.
Additionally, `task_ctx()` — called once per physical plan execution —
clones all three `HashMap`s in full into a new `TaskContext`, doubling the
allocation cost.
#### After
`with_default_features()` internally calls `default_scalar_registry()`,
`default_aggregate_registry()`, and `default_window_registry()`, assigning the
resulting `Arc`s directly to the builder's static fields. The Vec collection
and HashMap construction loops are skipped entirely for config-independent
functions. The `build()` loop only runs over the 6 prototype entries to produce
the per-session dynamic map.
`task_ctx()` now performs 3 `Arc::clone`s for the static maps and a small
`HashMap::clone` of ~6–8 entries for the dynamic map — instead of cloning three
full HashMaps.
---
## Scenarios
### Scenario 1 — Standard: no custom functions
A caller uses only DataFusion's built-in functions. No startup step is
needed in either version. The caller code is identical before and after — the
optimisation is entirely internal to `with_default_features()`.
<table>
<tr>
<th>Before — per session</th>
<th>After — per session</th>
</tr>
<tr>
<td>
- 3 `Vec` allocations
- ~165 `Arc::clone` into Vecs
- 3 `HashMap` allocations
- ~180+ `HashMap::insert` (names + aliases)
- ~180+ `String` key allocations
- ~115 `with_updated_config` calls
- ❌ + TaskContext: 3 more HashMap clones
</td>
<td>
- 3 `Arc::clone` (static maps)
- 6 new `ScalarUDF` instances (prototypes)
- 1 small `HashMap` allocation (~8 entries)
- ~8 `String` key allocations
- 6 `with_updated_config` calls
- ✅ + TaskContext: 3 `Arc::clone` + 1 small map clone
</td>
</tr>
</table>
**Before & After — per session**
```rust
// ── Startup
──────────────────────────────────────────────────────────────────
// (nothing in both versions)
// ── Per session — BEFORE
─────────────────────────────────────────────────────
let state = SessionStateBuilder::new()
.with_default_features() // ~165 Arc::clones + fresh Vecs + HashMaps
.with_config(config)
.build();
// ── Per session — AFTER (caller code identical)
───────────────────────────────
let state = SessionStateBuilder::new()
.with_default_features() // now: 3 Arc::clones + 6 dynamic instances
.with_config(config)
.build();
```
---
### Scenario 2 — Custom immutable functions
A caller registers one custom config-independent scalar UDF
(`my_scalar_udf`) and one custom aggregate UDAF (`my_aggregate_udaf`). In the
new design these are merged into the shared registry once at startup so that
per-session cost is identical to Scenario 1.
<table>
<tr>
<th>Before — per session</th>
<th>After</th>
</tr>
<tr>
<td>
- Same as Scenario 1 before
- +1 `Arc::clone` (custom UDF into Vec)
- +1 `Arc::clone` (custom UDAF into Vec)
- +2 `HashMap::insert`
- +2 `String` allocs
- ❌ All repeated on every session
</td>
<td>
**Startup (once):** clone inner HashMaps, insert custom functions, freeze
into new `Arc`s
**Per session:** identical to Scenario 1 after — custom functions already in
shared maps
</td>
</tr>
</table>
**Before**
```rust
// ── Startup
──────────────────────────────────────────────────────────────────
// (nothing)
// ── Per session
──────────────────────────────────────────────────────────────
let mut builder = SessionStateBuilder::new()
.with_default_features()
.with_config(config);
builder.scalar_functions().get_or_insert_default()
.push(my_scalar_udf()); // appended to the ~115 DF defaults
builder.aggregate_functions().get_or_insert_default()
.push(my_aggregate_udaf()); // appended to the ~38 DF defaults
let state = builder.build(); // ~167 scalar + ~39 aggregate HashMap
inserts
```
**After**
```rust
// ── Startup (once)
────────────────────────────────────────────────────────────
let mut scalar_functions =
SessionStateDefaults::default_scalar_registry().as_ref().clone();
register_into_map(&mut scalar_functions, my_scalar_udf()); // None from
with_updated_config
let scalar_functions = Arc::new(scalar_functions); // shadow: HashMap → Arc
(frozen)
let mut aggregate_functions =
SessionStateDefaults::default_aggregate_registry().as_ref().clone();
register_into_map(&mut aggregate_functions, my_aggregate_udaf());
let aggregate_functions = Arc::new(aggregate_functions); // shadow: HashMap
→ Arc (frozen)
let window_functions = SessionStateDefaults::default_window_registry(); //
no custom window UDFs
// ── Per session
───────────────────────────────────────────────────────────────
// 3 Arc::clones — all 167 functions available, zero HashMap allocs
let state = SessionStateBuilder::new()
.with_shared_scalar_functions(Arc::clone(&scalar_functions))
.with_shared_aggregate_functions(Arc::clone(&aggregate_functions))
.with_shared_window_functions(Arc::clone(&window_functions))
.with_config(config)
.build();
```
---
### Scenario 3 — Custom mutable (config-dependent) function
A caller provides a custom scalar UDF `format_timestamp` whose return type
depends on the session locale. It overrides `with_updated_config`. The routing
signal routes it to the dynamic map automatically. In the new design the
`build()` loop runs only over this one function, not all ~165 defaults.
<table>
<tr>
<th>Before — per session</th>
<th>After — per session</th>
</tr>
<tr>
<td>
- Same as Scenario 1 before
- +1 `Arc::clone` (into Vec)
- +1 new `ScalarUDF` instance (from `with_updated_config`)
- +1 `HashMap::insert`
- +1 `String` alloc
- ❌ `with_updated_config` called on all ~115 scalar UDFs needlessly
</td>
<td>
- Same as Scenario 1 after
- +1 new `ScalarUDF` instance (from prototype)
- +1 `HashMap::insert`
- +1 `String` alloc
- ✅ `with_updated_config` called on 7 functions only (6 built-in + 1 custom)
</td>
</tr>
</table>
**Before**
```rust
// ── Startup
──────────────────────────────────────────────────────────────────
// (nothing)
// ── Per session
──────────────────────────────────────────────────────────────
// build() calls with_updated_config on all ~115 scalar UDFs
let mut builder = SessionStateBuilder::new()
.with_default_features()
.with_config(config);
builder.scalar_functions().get_or_insert_default().push(format_timestamp_udf());
let state = builder.build();
```
**After**
```rust
// ── Startup (once)
────────────────────────────────────────────────────────────
let prototype = format_timestamp_udf(); // default-config instance, stored
on the runtime
// ── Per session
──────────────────────────────────────────────────────────────
// with_default_features() uses Arc::clone for static maps;
// build() loop runs over prototype only — gets Some, routes to dynamic map
let mut builder = SessionStateBuilder::new()
.with_default_features()
.with_config(config);
builder.scalar_functions().get_or_insert_default().push(Arc::clone(&prototype));
let state = builder.build();
// format_timestamp is in scalar_functions_dynamic, baked with the session
config
// SET statement refreshes it via with_updated_config
```
---
## API Changes
### Internal struct changes
Private fields on `SessionState` and `TaskContext` change as follows. These
are not directly part of the public API but drive all downstream changes.
| Struct | Field | Before | After |
|---|---|---|---|
| `SessionState` | `scalar_functions` | `HashMap<String, Arc<ScalarUDF>>` |
split into `scalar_functions_static: Arc<HashMap>` and
`scalar_functions_dynamic: HashMap` |
| `SessionState` | `aggregate_functions` | `HashMap<String,
Arc<AggregateUDF>>` | `Arc<HashMap<String, Arc<AggregateUDF>>>` |
| `SessionState` | `window_functions` | `HashMap<String, Arc<WindowUDF>>` |
`Arc<HashMap<String, Arc<WindowUDF>>>` |
| `TaskContext` | `scalar_functions` | `HashMap<String, Arc<ScalarUDF>>` |
same split as `SessionState` |
| `TaskContext` | `aggregate_functions` | `HashMap<String,
Arc<AggregateUDF>>` | `Arc<HashMap<String, Arc<AggregateUDF>>>` |
| `TaskContext` | `window_functions` | `HashMap<String, Arc<WindowUDF>>` |
`Arc<HashMap<String, Arc<WindowUDF>>>` |
| `SessionStateBuilder` | `build()` scalar loop | routes all scalar UDFs
into one `HashMap` | routes via `with_updated_config` into static or dynamic
map; freezes static into `Arc` |
### New public APIs (additive — no breaking change)
| Location | New symbol | Purpose |
|---|---|---|
| `SessionStateDefaults` | `default_scalar_registry() -> Arc<HashMap<String,
Arc<ScalarUDF>>>` | `LazyLock`-backed process-wide static scalar registry
(config-independent only). One `Arc::clone` per call. |
| `SessionStateDefaults` | `default_aggregate_registry() ->
Arc<HashMap<String, Arc<AggregateUDF>>>` | Same for aggregates. |
| `SessionStateDefaults` | `default_window_registry() -> Arc<HashMap<String,
Arc<WindowUDF>>>` | Same for window functions. |
| `SessionStateBuilder` | `with_shared_scalar_functions(Arc<HashMap>)` |
Inject a pre-built static scalar registry. When set, `build()` skips the static
construction loop entirely. |
| `SessionStateBuilder` | `with_shared_aggregate_functions(Arc<HashMap>)` |
Same for aggregates. |
| `SessionStateBuilder` | `with_shared_window_functions(Arc<HashMap>)` |
Same for window functions. |
### Breaking public API changes
> ℹ️ **Rationale for bundling all breaking changes in one PR:** the
mandatory breaks (`scalar_functions()` and `TaskContext::new()`) cannot be
avoided cleanly. Since the PR is already a breaking change, removing the three
`default_*_functions()` methods in the same PR avoids a separate deprecation PR
later and leaves the API in a cleaner final state. The three removals are **not
mandatory** — deprecation is a valid alternative path — but the cost of
bundling them now is low given the context.
> ⚠️ **Breaking surface is narrow** — normal query-serving code uses
`FunctionRegistry::udf(name)`, which is unchanged. Only code that iterates all
scalar functions, constructs `TaskContext` directly, or calls the
`default_*_functions()` helpers is affected.
| Location | Symbol | Mandatory? | Change | Migration |
|---|---|---|---|---|
| `SessionState` | `scalar_functions() -> &HashMap<…>` | 🔴 Mandatory |
Cannot return a unified `&HashMap` across static and dynamic maps without
allocation. Split into `static_scalar_functions()` and
`dynamic_scalar_functions()`. | Update callers to use the appropriate getter.
Most callers only need `FunctionRegistry::udf(name)` and are unaffected. |
| `TaskContext` | `scalar_functions() -> &HashMap<…>` | 🔴 Mandatory | Same
split as `SessionState`. | Same as above. |
| `TaskContext` | `TaskContext::new(…)` | 🔴 Mandatory | Scalar parameter
splits into two; aggregate and window parameters become `Arc<HashMap>`. |
Update call sites (tests, custom executors) to pass the split parameters. |
| `SessionStateDefaults` | `default_scalar_functions() -> Vec<…>` | 🟡 Not
mandatory | Semantically incompatible with the new design: returns
config-dependent functions that are no longer in the static registry. Removing
avoids a silent semantic trap for callers who expect all defaults. Replace with
`default_scalar_registry()`. | Could be deprecated instead of removed.
Recommended to remove given mandatory breaks already present. |
| `SessionStateDefaults` | `default_aggregate_functions() -> Vec<…>` | 🟡 Not
mandatory | Semantically equivalent to `default_aggregate_registry()` but
returns a `Vec` instead of an `Arc<HashMap>`. Replace with
`default_aggregate_registry()`. | Could be deprecated instead of removed.
Recommended to remove for consistency. |
| `SessionStateDefaults` | `default_window_functions() -> Vec<…>` | 🟡 Not
mandatory | Same as `default_aggregate_functions()` above. Replace with
`default_window_registry()`. | Could be deprecated instead of removed.
Recommended to remove for consistency. |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]