Hi all,

I would like to discuss a long-standing pain point for native execution
engines(Gluten) built on Spark: *the memory held by the native side cannot
participate in Spark's storage memory accounting as a first-class citizen*.
I will summarize what has been tried, what's still missing, and a concrete
mechanism I'd like the community's feedback on.
1. The problem

When a native engine wants to cache columnar payloads next to Spark —
native vectors, Arrow buffers, native file caches, etc. — the memory lives
outside the JVM heap and is allocated through a separate allocator. Two
things go wrong today:

*(a) The two pools don't share a budget.* The native allocator's budget is
configured independently from spark.memory.storage /
spark.memory.offHeap.size. As a result, the user has to statically
partition memory between Spark and the native engine, and neither side can
borrow from the other under pressure. Practical outcomes: the native cache
spills aggressively while Spark storage sits idle, or vice versa.

*(b) Even when df.persist() is used to cache native results, the payload is
copied through the JVM.* The columnar cache SPI today (CachedBatchSerializer)
requires the produced CachedBatch to live as JVM-owned bytes so that
MemoryStore can hold it as a DeserializedMemoryEntry / SerializedMemoryEntry.
For any native engine, this means serializing each batch from native memory
into a JVM byte[] on the write side and deserializing back into native
memory on the read side. Every cache hit pays the cost of two off-heap <->
on-heap copies (and any associated cross-boundary call overhead). The point
of a native engine — staying off-heap and zero-copy — is defeated at the
cache boundary.
2. Prior attempts and why they fell short

I want to credit two prior efforts that explored adjacent designs:

*SPARK-48694 <https://issues.apache.org/jira/browse/SPARK-48694> /
PR #47067 <https://github.com/apache/spark/pull/47067>* introduced an
ExternalMemoryStore parallel to MemoryStore, sharing the storage pool
budget, with three new configs (spark.memory.external.cache.enabled,
spark.memory.external.storageFraction,
spark.memory.storage.preferEvictExtCache). The eviction policy under
pressure is a *static global boolean* — always prefer one store over the
other — rather than unified per-block LRU. It also duplicates a fair amount
of MemoryStore infrastructure into a parallel surface, and is shaped around
the "file cache" use case rather than a general external-object lifecycle.
The PR was eventually closed as stale.

*SPARK-56918 <https://issues.apache.org/jira/browse/SPARK-56918> /
PR #55953 <https://github.com/apache/spark/pull/55953> (myself, a few days
ago)* went a different way: I added a ManagedConsumer SPI to
UnifiedMemoryManager, by analogy with UnmanagedConsumer. The native side
registers as a consumer, Spark calls back to shrink it. Two problems
surfaced in review:

   1. *The accounting is not symmetric.* Consumers are an execution-memory
   concept; they are asked to spill under execution pressure. Storage blocks
   have their own LRU within the storage pool. Bolting a "managed consumer"
   onto storage means the native side is always the first to be reclaimed
   under any pressure — there is no fair LRU between native and JVM entries.
   2. *It does not address the serdes problem.* Consumers register a
   *budget*; they don't give Spark anything to evict at batch granularity.
   df.persist() still has to go through the JVM byte path.

Neither attempt gives the native engine a way to say *"I have this opaque
payload, please count it against the storage pool and tell me when to
release it."*
3. What I'd like to propose

The core idea is small: *let BlockManager register and evict blocks whose
bytes live outside the JVM*, then build the cache SPI on top.
Layer 1 — a "native block" primitive in core

A new BlockManager entry point lets a caller register an executor-local
block with three pieces of information:

   - a new NativeBlockId subtype, so the block is unambiguously
   distinguishable from JVM-resident blocks;
   - the number of bytes to reserve in the storage memory pool (on- or
   off-heap, the caller's choice);
   - a release() callback that Spark invokes when the block is evicted or
   removed.

If storage pressure forces eviction, the existing
MemoryStore.evictBlocksToFreeSpace algorithm picks across *all* entries —
JVM-resident and native — using the same LRU policy. No preference flag, no
parallel store. When a native entry is chosen, release() is invoked exactly
once and the reserved bytes return to the pool.

Read paths (getLocalValues, getLocalBytes, getLocalBlockData, get,
replicateBlock) refuse native blocks with a clear error: the payload is
opaque to Spark and must be read through the owning engine's own API. The
master is never notified, because native payloads are inherently
executor-local and cannot be fetched over the network.

So BlockManager becomes a unified registry for "this many bytes of storage
are committed; call me back when you need them released," regardless of who
owns the actual bytes.
Layer 2 — a native variant of the columnar cache SPI

On top of layer 1, CachedBatchSerializer grows a small opt-in:

   - a flag indicating that the serializer produces batches whose payload
   is owned by an external engine;
   - a NativeCachedBatch trait carrying release() plus a callback that
   receives the NativeBlockId Spark assigns;
   - a lightweight handle type (block id + row count + size + optional
   column stats) that flows through the cached RDD in place of the original
   payload.

When the flag is set, the cache builder takes a different fork. For each
batch produced by the serializer:

   1. Spark reserves storage memory via the layer-1 entry point under a
   deterministic id.
   2. The serializer is informed of the id, so it can populate its own
   executor-local lookup table mapping id back to the engine payload.
   3. The Spark-visible CachedBatch is just the handle; the handle RDD is
   persisted at MEMORY_ONLY (handles are tiny).
   4. On scan, the serializer's convertCachedBatchTo* methods resolve the
   handle's block id via the lookup table and hand back the native payload
   directly.

Because the handle still carries column statistics in the existing
SimpleMetricsCachedBatch schema, partition pruning continues to work
without changes to the cache scan operator.

The cache-eviction story is now closed-loop: Spark LRU picks the oldest
entry (JVM or native), MemoryStore calls the registered release(), the
native engine frees its bytes, and the next scan recomputes the affected
partitions.
4. Why this is more general than the previous attempts

   - *Unified LRU, not a preference flag.* Native and JVM blocks live in
   the same entries map; the existing eviction algorithm handles them
   uniformly. No new tunable for users to misconfigure.
   - *No new configs.* It reuses
   spark.memory.{offHeap.enabled,offHeap.size,fraction,storageFraction} exactly
   as today.
   - *No MemoryStore clone.* Only a new entry variant alongside the
   existing two — small surface, same lifecycle hooks.
   - *Not bound to a single use case.* "Native file cache" and "native
   columnar cache" are both just consumers of the same core primitive.
   Anything with an opaque, off-heap payload and a release() semantics fits.
   - *Solves the serdes problem.* Native engines can opt into the SPI and
   have df.persist(MEMORY_ONLY) stay zero-copy end to end. The columnar
   cache SPI stops being a forced serialization boundary.

5. Trade-offs to discuss

A few things worth surfacing:

   - *No replication, no spill, no remote fetch.* Native blocks are
   executor-local and opaque to Spark; the SPI explicitly rejects storage
   levels that request disk or replication > 1. Engines that want spill or
   replication have to implement it themselves on top.
   - *Dataset.cache() default is MEMORY_AND_DISK*, which the native path
   would reject. Users must call persist(MEMORY_ONLY) explicitly. We could
   relax this with a silent downgrade, or expose a new canonical storage-level
   constant. I lean toward "be explicit" for the initial cut but happy to hear
   opinions.
   - *Cluster-mode unpersist cleanup is best-effort.* Today the cache
   builder only touches the driver's local BlockManager; remote executors
   will release their native blocks via LRU or shutdown. A driver -> executor
   cleanup RPC is a natural follow-up.

6. What I have ready

I have a 3-commit stack on top of master that I'd like to put up for review
once a JIRA umbrella is filed:

   1. [CORE] Add native block storage for opaque external-engine payloads —
   the new BlockId subtype, the MemoryStore variant, the BlockManager entry
   point, and tests covering reservation, eviction, release-on-evict, and
   release-throws.
   2. [CORE] Harden native block storage — type enforcement, duplicate-put
   detection, replicate/master guards.
   3. [SQL] Wire the SPI into the columnar cache builder, with end-to-end
   tests using a fake native serializer that exercises the round trip through
   df.persist(MEMORY_ONLY) and eviction.

7. What I'm asking

   1. Is the community OK with BlockManager growing a "register external
   payload" surface? I think this is the right home for it — the alternative
   is yet another sidecar registry — but I want to confirm before pushing PRs.
   2. Reviewers familiar with MemoryStore / UnifiedMemoryManager /
   CachedBatchSerializer who can weigh in on the API shape: memory-mode
   choice, lock semantics inside release(), the registration callback, and
   how strict the disk / replication rejection should be.
   3. Anything I'm missing from a prior discussion that already converged
   on a different direction.

Thanks for reading. I'd love to hear what people think before I take this
further.

Best, Chang Chen

Reply via email to