LantaoJin opened a new pull request, #85:
URL: https://github.com/apache/datafusion-java/pull/85
## Which issue does this PR close?
- Closes #82 .
## Rationale for this change
Multi-tenant DataFusion deployments need two operational signals that the
Java binding currently does not expose:
1. **Per-session memory.** `SessionContextBuilder.memoryLimit(...)` (PR #28)
caps the global pool, but if a tenant blows past their fair-share allocation
there is no way to attribute the bytes back to a session. Without per-session
attribution, fair-share scheduling, abuse detection, and OOM root-causing all
fall back to runtime restart.
2. **Tokio runtime stats.** The JNI library drives a single shared
multi-threaded Tokio runtime in `lib.rs`. Embedders that surface node-level
health -- e.g. an OpenSearch `_nodes/stats` endpoint -- need worker count, busy
time, queue depth, etc. Today they have to hand-roll a parallel native bridge.
Both share an FFI snapshot pattern: read a small struct of counters across
the boundary on demand. They are bundled here so the design conversation only
happens once.
## What changes are included in this PR?
Two new accessors on `SessionContext`, two new immutable POJOs:
```java
public final class MemoryUsage { long currentBytes(); long peakBytes(); }
public final class RuntimeStats {
int numWorkers();
long liveTasksCount(), globalQueueDepth(),
elapsedNanos(), totalBusyNanos(),
totalParkCount(), totalPollsCount(), totalNoopCount(),
totalStealCount(), totalLocalScheduleCount(), totalOverflowCount();
}
ctx.memoryUsage(); // always-on; thread-safe; pollable while queries run
ctx.runtimeStats(); // requires `runtime-metrics` Cargo feature
```
### Per-session memory tracking
`native/src/memory.rs` introduces `TrackingMemoryPool`, a thin wrapper
around any `Arc<dyn MemoryPool>` that intercepts `grow`/`try_grow`/`shrink` to
maintain two `AtomicU64` counters: total bytes currently held and the peak
observed since session creation. Pool semantics (limits, eviction, spilling)
are unchanged because `try_grow` still defers to the inner pool.
The wrapper is layered on automatically by both `createSessionContext` and
`createSessionContextWithOptions` -- callers don't opt in. If
`SessionContextBuilder.memoryLimit(...)` configured a `GreedyMemoryPool` or
`TrackConsumersPool`, the tracker wraps that. If it didn't, the tracker wraps
DataFusion's default `UnboundedMemoryPool`.
Java callers can't downcast `Arc<dyn MemoryPool>` back to the concrete
tracker type (the trait does not require `Any`), so a process-wide
`Mutex<HashMap<jlong, Arc<TrackingMemoryPool>>>` keyed by the JNI handle gives
the snapshot path a way to find the right tracker. Inserted at session create,
drained at session close; no extra failure modes.
**Per-session, not per-DataFrame.** A cross-engine survey (pandas / Polars /
Spark / DuckDB / DataFusion-Rust + Python) confirmed that no engine ships
per-DataFrame in-flight memory accounting. What pandas/Polars expose as
`memory_usage` / `estimated_size` is data-at-rest sizing of materialised
columns -- a different feature. Multi-tenant attribution in DataFusion is
conventionally one session per tenant, which matches the OpenSearch prior art
(`QueryMemoryPool` keyed off `context_id`). Per-DataFrame attribution would
need a side-channel registry hooked into operator-time consumer creation; not
blocked by this PR, can land later if requested.
### Tokio runtime metrics
`native/src/runtime_metrics.rs` is gated behind a default-off
`runtime-metrics` Cargo feature because `tokio-metrics` requires `--cfg
tokio_unstable` at build time. `tokio_metrics::RuntimeMonitor::intervals()` is
a delta iterator -- each `next()` returns metrics covering the period since the
previous call -- so the module owns a single process-wide `RuntimeAccumulator`
that maintains running totals for documented-monotonic fields. Snapshot
(point-in-time) fields (`workers_count`, `live_tasks_count`,
`global_queue_depth`) pass through without accumulation.
```toml
[features]
runtime-metrics = ["dep:tokio-metrics"]
[dependencies]
tokio-metrics = { version = "0.5", optional = true }
```
Build matrix:
| invocation | runtime-metrics | build prereqs |
|---|---|---|
| `cargo build` (default) | off (stub handler) | none |
| `RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics`
| on | `--cfg tokio_unstable` |
The Java surface is unchanged either way -- `SessionContext.runtimeStats()`
is always present; calls just throw a clear "datafusion-jni was built without
the `runtime-metrics` Cargo feature; rebuild the native crate with
`RUSTFLAGS=\"--cfg tokio_unstable\" cargo build --features runtime-metrics`"
error from the JVM if the feature was compiled off.
`SessionContextRuntimeStatsTest` detects this case and skips itself via JUnit's
`Assumptions.assumeFalse(...)`, so `make test` stays green either way.
A new `make native-runtime-metrics` target makes the opt-in build a
one-liner.
This is intentionally similar to PR #75's `substrait` feature handling: a
heavy / build-prereq-bearing dependency stays out of the default build, the
Java surface is unchanged, and a feature-off compile substitutes a stub handler
that throws clearly.
## Are these changes tested?
Yes -- 9 new tests across `SessionContextMemoryUsageTest` and
`SessionContextRuntimeStatsTest`.
## Are there any user-facing changes?
Yes -- purely additive. New public API:
- `org.apache.datafusion.MemoryUsage` (immutable value class)
- `org.apache.datafusion.RuntimeStats` (immutable value class)
- `SessionContext.memoryUsage() -> MemoryUsage`
- `SessionContext.runtimeStats() -> RuntimeStats`
No API removals, no deprecations, no behavior change for existing callers.
The default `cargo build` does **not** pull in `tokio-metrics` and adds no new
build prerequisites. `SessionContext.memoryUsage()` is always available;
`runtimeStats()` is present but throws "feature not enabled" at runtime unless
rebuilt with the feature.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]