avantgardnerio commented on code in PR #22626:
URL: https://github.com/apache/datafusion/pull/22626#discussion_r3327381602


##########
datafusion/sqllogictest/src/accounting.rs:
##########
@@ -0,0 +1,448 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Allocation-budget `GlobalAlloc` wrapper.
+//!
+//! The bank is a *budget*, not a counter. Allocations *debit* the bank,
+//! deallocations *credit* it. [`init_bank`] sets the starting budget.
+//! Going below zero is an overdraft — detectable via [`is_overdrawn`] or
+//! by reading [`bank_balance`] directly.
+//!
+//! Every `alloc`/`realloc`/`dealloc` updates a thread-local `isize` (also
+//! a balance — debit on alloc, credit on free). When that local's magnitude
+//! crosses [`SETTLE_THRESHOLD`] it transfers into the global bank with one
+//! atomic op and the local zeros out. This amortizes cache-line contention
+//! across thousands of allocations.
+//!
+//! [`bank_balance`] reads the global bank with a single relaxed atomic load.
+//! It lags reality by up to `SETTLE_THRESHOLD * num_threads` (≈1 MB on a
+//! 16-core box) — fine for catching multi-GB drift.
+//!
+//! # Enforcement
+//!
+//! By default the allocator only *counts* — overdrafts set the sticky
+//! [`is_overdrawn`] flag but allocations continue. Code that wants the
+//! allocator to actively kill its in-flight work wraps its futures in
+//! [`kill_on_overdraft`]. While such a future is being polled, the very
+//! next debit after the bank goes negative panics that future with an
+//! [`OverdraftPanic`] payload. Other futures (system tasks, HTTP handlers)
+//! polled on the same worker thread are unaffected — they're not inside
+//! the scope, so `track()` skips the check.
+//!
+//! Compiled in only when the `memory-accounting` feature is on.
+
+use std::alloc::{GlobalAlloc, Layout, System};
+use std::cell::Cell;
+use std::future::Future;
+use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
+
+/// Net byte change at which a thread flushes its local count into the bank.
+/// 64 KB chosen to keep per-thread drift tight (≤1 MB on a 16-core box) while
+/// still settling rarely enough to make the bank's atomic op amortized-free.
+const SETTLE_THRESHOLD: isize = 64 * 1024;
+
+static BANK: AtomicIsize = AtomicIsize::new(0);
+
+/// Last value passed to [`init_bank`]. Used by [`bytes_used`] to express
+/// "how much has been allocated since the budget was set" as a positive
+/// number, which is more readable than a deeply-negative balance.
+static INITIAL_BUDGET: AtomicIsize = AtomicIsize::new(0);
+
+/// Sticky flag: set the moment any thread's settle observes the bank crossing
+/// below zero. Cleared by [`init_bank`] or [`clear_overdraft`]. Reads are
+/// a single relaxed atomic load.
+static OVERDRAWN: AtomicBool = AtomicBool::new(false);
+
+thread_local! {
+    static LOCAL: Cell<isize> = const { Cell::new(0) };
+}
+
+tokio::task_local! {
+    /// Active iff the current task is inside a [`kill_on_overdraft`] scope.
+    /// Holding a `Cell<bool>` lets us track "we already started panicking"
+    /// without a separate thread-local, since the cell is task-scoped — it
+    /// disappears when the future unwinds past the scope, so the next task
+    /// on the same worker thread sees no enforcement.
+    static KILL_GUARD: Cell<bool>;
+}
+
+/// Payload attached to allocator-induced panics. Catch with:
+///
+/// ```ignore
+/// match std::panic::catch_unwind(|| { /* ... */ }) {
+///     Err(e) if e.is::<OverdraftPanic>() => { /* it was an overdraft */ }
+///     ...
+/// }
+/// ```
+#[derive(Debug, Clone)]
+pub struct OverdraftPanic {
+    /// Bank balance at the moment the panic fired (negative — that's the 
point).
+    pub bank_balance: isize,
+}
+
+/// Wrap a future so the allocator may panic it on overdraft.
+///
+/// While `f` is being polled, any allocation that finds the bank overdrawn
+/// causes an immediate `panic_any(OverdraftPanic)` on the polling thread.
+/// Drop runs of in-flight values during the unwind do not re-panic. After
+/// unwinding past the scope, the worker thread reverts to the default
+/// "count, but don't kill" mode, so any non-marked futures it polls next
+/// (system tasks, hyper handlers, etc.) are unaffected.
+pub async fn kill_on_overdraft<F: Future>(f: F) -> F::Output {
+    KILL_GUARD.scope(Cell::new(false), f).await
+}
+
+/// Set the bank to `value`, remember it as the budget for [`bytes_used`],
+/// and clear the overdraft flag. Idempotent; safe to call from any thread.
+///
+/// Typical use: call once early in `main` to preload a memory budget before
+/// significant allocation occurs. Note that per-thread locals are NOT zeroed,
+/// so calling this mid-program leaves prior unsettled deltas in the locals
+/// that will eventually flush into the bank.
+pub fn init_bank(value: isize) {
+    INITIAL_BUDGET.store(value, Ordering::Relaxed);
+    BANK.store(value, Ordering::Relaxed);
+    OVERDRAWN.store(false, Ordering::Relaxed);
+}
+
+/// Bytes allocated since the last [`init_bank`] — i.e., `budget - 
bank_balance`.
+/// Always positive (or zero) under normal conditions; only meaningful relative
+/// to the most recent `init_bank` call.
+pub fn bytes_used() -> isize {
+    INITIAL_BUDGET.load(Ordering::Relaxed) - BANK.load(Ordering::Relaxed)
+}
+
+/// Reset the bank to the budget last passed to [`init_bank`] and clear the
+/// overdraft flag. No-op if `init_bank` has never been called (budget = 0).
+///
+/// Intended to be called between independent units of work — e.g., between
+/// SQL statements — so each gets a fresh budget regardless of whether the
+/// previous one drifted the bank or tripped overdraft.
+pub fn reset_bank() {
+    let budget = INITIAL_BUDGET.load(Ordering::Relaxed);
+    BANK.store(budget, Ordering::Relaxed);
+    OVERDRAWN.store(false, Ordering::Relaxed);
+}
+
+/// Cross-module config for DataFusion's voluntary `MemoryPool` limit, set
+/// from the SLT binary's CLI and read by test_context when building each
+/// per-file `RuntimeEnv`. Zero means "use the default `UnboundedMemoryPool`".
+static MEMORY_TRACKER_LIMIT: AtomicUsize = AtomicUsize::new(0);
+
+/// Set the size (in bytes) the per-file `MemoryPool` should be built with.
+/// Zero (the default) leaves the existing `UnboundedMemoryPool` behavior.
+pub fn set_memory_tracker_limit(bytes: usize) {
+    MEMORY_TRACKER_LIMIT.store(bytes, Ordering::Relaxed);
+}
+
+/// Current `MemoryPool` limit configured via [`set_memory_tracker_limit`].
+pub fn memory_tracker_limit() -> usize {
+    MEMORY_TRACKER_LIMIT.load(Ordering::Relaxed)
+}
+
+/// True iff any settle has ever seen the bank go below zero since the last
+/// [`init_bank`] / [`clear_overdraft`]. Single relaxed atomic load.
+pub fn is_overdrawn() -> bool {
+    OVERDRAWN.load(Ordering::Relaxed)
+}
+
+/// Clear the sticky overdraft flag without touching the bank.
+pub fn clear_overdraft() {
+    OVERDRAWN.store(false, Ordering::Relaxed);
+}
+
+/// Current bank balance. Allocs debit, deallocs credit, negative = overdraft.
+/// Single relaxed atomic load; cheap to call from any thread.
+///
+/// Lags reality by up to `SETTLE_THRESHOLD * num_threads`. To get an exact
+/// snapshot, drain each thread's local first via [`settle_thread_local`] —
+/// usually overkill.
+pub fn bank_balance() -> isize {
+    BANK.load(Ordering::Relaxed)
+}
+
+/// Current thread's local balance — not yet reflected in the global bank.
+/// Always in `(-SETTLE_THRESHOLD, +SETTLE_THRESHOLD)`. Sign matches the bank:
+/// negative on a thread that's net-allocated, positive on one that's 
net-freed.
+pub fn local_balance() -> isize {
+    LOCAL.with(|c| c.get())
+}
+
+/// Force the current thread to flush its local count into the bank.
+pub fn settle_thread_local() {
+    let _ = LOCAL.try_with(|c| {
+        let v = c.replace(0);
+        if v != 0 {
+            BANK.fetch_add(v, Ordering::Relaxed);
+        }
+    });
+}
+
+/// Record a delta against the current thread's local; flush to bank if we've
+/// crossed `±SETTLE_THRESHOLD`; on a debit, possibly kill the task. Inlines
+/// into the alloc hot path.
+#[inline(always)]
+fn track(delta: isize) {
+    let _ = LOCAL.try_with(|c| {
+        let new = c.get() + delta;

Review Comment:
   We could also check `KILL_GUARD` here and only account memory from 
DataFusion threads, but I lean towards "I want to know if the process will die" 
more than "is DataFusion responsible for killing this process"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to