Hi all, I would like to discuss a long-standing pain point for native execution engines(Gluten) built on Spark: *the memory held by the native side cannot participate in Spark's storage memory accounting as a first-class citizen*. I will summarize what has been tried, what's still missing, and a concrete mechanism I'd like the community's feedback on. 1. The problem
When a native engine wants to cache columnar payloads next to Spark — native vectors, Arrow buffers, native file caches, etc. — the memory lives outside the JVM heap and is allocated through a separate allocator. Two things go wrong today: *(a) The two pools don't share a budget.* The native allocator's budget is configured independently from spark.memory.storage / spark.memory.offHeap.size. As a result, the user has to statically partition memory between Spark and the native engine, and neither side can borrow from the other under pressure. Practical outcomes: the native cache spills aggressively while Spark storage sits idle, or vice versa. *(b) Even when df.persist() is used to cache native results, the payload is copied through the JVM.* The columnar cache SPI today (CachedBatchSerializer) requires the produced CachedBatch to live as JVM-owned bytes so that MemoryStore can hold it as a DeserializedMemoryEntry / SerializedMemoryEntry. For any native engine, this means serializing each batch from native memory into a JVM byte[] on the write side and deserializing back into native memory on the read side. Every cache hit pays the cost of two off-heap <-> on-heap copies (and any associated cross-boundary call overhead). The point of a native engine — staying off-heap and zero-copy — is defeated at the cache boundary. 2. Prior attempts and why they fell short I want to credit two prior efforts that explored adjacent designs: *SPARK-48694 <https://issues.apache.org/jira/browse/SPARK-48694> / PR #47067 <https://github.com/apache/spark/pull/47067>* introduced an ExternalMemoryStore parallel to MemoryStore, sharing the storage pool budget, with three new configs (spark.memory.external.cache.enabled, spark.memory.external.storageFraction, spark.memory.storage.preferEvictExtCache). The eviction policy under pressure is a *static global boolean* — always prefer one store over the other — rather than unified per-block LRU. It also duplicates a fair amount of MemoryStore infrastructure into a parallel surface, and is shaped around the "file cache" use case rather than a general external-object lifecycle. The PR was eventually closed as stale. *SPARK-56918 <https://issues.apache.org/jira/browse/SPARK-56918> / PR #55953 <https://github.com/apache/spark/pull/55953> (myself, a few days ago)* went a different way: I added a ManagedConsumer SPI to UnifiedMemoryManager, by analogy with UnmanagedConsumer. The native side registers as a consumer, Spark calls back to shrink it. Two problems surfaced in review: 1. *The accounting is not symmetric.* Consumers are an execution-memory concept; they are asked to spill under execution pressure. Storage blocks have their own LRU within the storage pool. Bolting a "managed consumer" onto storage means the native side is always the first to be reclaimed under any pressure — there is no fair LRU between native and JVM entries. 2. *It does not address the serdes problem.* Consumers register a *budget*; they don't give Spark anything to evict at batch granularity. df.persist() still has to go through the JVM byte path. Neither attempt gives the native engine a way to say *"I have this opaque payload, please count it against the storage pool and tell me when to release it."* 3. What I'd like to propose The core idea is small: *let BlockManager register and evict blocks whose bytes live outside the JVM*, then build the cache SPI on top. Layer 1 — a "native block" primitive in core A new BlockManager entry point lets a caller register an executor-local block with three pieces of information: - a new NativeBlockId subtype, so the block is unambiguously distinguishable from JVM-resident blocks; - the number of bytes to reserve in the storage memory pool (on- or off-heap, the caller's choice); - a release() callback that Spark invokes when the block is evicted or removed. If storage pressure forces eviction, the existing MemoryStore.evictBlocksToFreeSpace algorithm picks across *all* entries — JVM-resident and native — using the same LRU policy. No preference flag, no parallel store. When a native entry is chosen, release() is invoked exactly once and the reserved bytes return to the pool. Read paths (getLocalValues, getLocalBytes, getLocalBlockData, get, replicateBlock) refuse native blocks with a clear error: the payload is opaque to Spark and must be read through the owning engine's own API. The master is never notified, because native payloads are inherently executor-local and cannot be fetched over the network. So BlockManager becomes a unified registry for "this many bytes of storage are committed; call me back when you need them released," regardless of who owns the actual bytes. Layer 2 — a native variant of the columnar cache SPI On top of layer 1, CachedBatchSerializer grows a small opt-in: - a flag indicating that the serializer produces batches whose payload is owned by an external engine; - a NativeCachedBatch trait carrying release() plus a callback that receives the NativeBlockId Spark assigns; - a lightweight handle type (block id + row count + size + optional column stats) that flows through the cached RDD in place of the original payload. When the flag is set, the cache builder takes a different fork. For each batch produced by the serializer: 1. Spark reserves storage memory via the layer-1 entry point under a deterministic id. 2. The serializer is informed of the id, so it can populate its own executor-local lookup table mapping id back to the engine payload. 3. The Spark-visible CachedBatch is just the handle; the handle RDD is persisted at MEMORY_ONLY (handles are tiny). 4. On scan, the serializer's convertCachedBatchTo* methods resolve the handle's block id via the lookup table and hand back the native payload directly. Because the handle still carries column statistics in the existing SimpleMetricsCachedBatch schema, partition pruning continues to work without changes to the cache scan operator. The cache-eviction story is now closed-loop: Spark LRU picks the oldest entry (JVM or native), MemoryStore calls the registered release(), the native engine frees its bytes, and the next scan recomputes the affected partitions. 4. Why this is more general than the previous attempts - *Unified LRU, not a preference flag.* Native and JVM blocks live in the same entries map; the existing eviction algorithm handles them uniformly. No new tunable for users to misconfigure. - *No new configs.* It reuses spark.memory.{offHeap.enabled,offHeap.size,fraction,storageFraction} exactly as today. - *No MemoryStore clone.* Only a new entry variant alongside the existing two — small surface, same lifecycle hooks. - *Not bound to a single use case.* "Native file cache" and "native columnar cache" are both just consumers of the same core primitive. Anything with an opaque, off-heap payload and a release() semantics fits. - *Solves the serdes problem.* Native engines can opt into the SPI and have df.persist(MEMORY_ONLY) stay zero-copy end to end. The columnar cache SPI stops being a forced serialization boundary. 5. Trade-offs to discuss A few things worth surfacing: - *No replication, no spill, no remote fetch.* Native blocks are executor-local and opaque to Spark; the SPI explicitly rejects storage levels that request disk or replication > 1. Engines that want spill or replication have to implement it themselves on top. - *Dataset.cache() default is MEMORY_AND_DISK*, which the native path would reject. Users must call persist(MEMORY_ONLY) explicitly. We could relax this with a silent downgrade, or expose a new canonical storage-level constant. I lean toward "be explicit" for the initial cut but happy to hear opinions. - *Cluster-mode unpersist cleanup is best-effort.* Today the cache builder only touches the driver's local BlockManager; remote executors will release their native blocks via LRU or shutdown. A driver -> executor cleanup RPC is a natural follow-up. 6. What I have ready I have a 3-commit stack on top of master that I'd like to put up for review once a JIRA umbrella is filed: 1. [CORE] Add native block storage for opaque external-engine payloads — the new BlockId subtype, the MemoryStore variant, the BlockManager entry point, and tests covering reservation, eviction, release-on-evict, and release-throws. 2. [CORE] Harden native block storage — type enforcement, duplicate-put detection, replicate/master guards. 3. [SQL] Wire the SPI into the columnar cache builder, with end-to-end tests using a fake native serializer that exercises the round trip through df.persist(MEMORY_ONLY) and eviction. 7. What I'm asking 1. Is the community OK with BlockManager growing a "register external payload" surface? I think this is the right home for it — the alternative is yet another sidecar registry — but I want to confirm before pushing PRs. 2. Reviewers familiar with MemoryStore / UnifiedMemoryManager / CachedBatchSerializer who can weigh in on the API shape: memory-mode choice, lock semantics inside release(), the registration callback, and how strict the disk / replication rejection should be. 3. Anything I'm missing from a prior discussion that already converged on a different direction. Thanks for reading. I'd love to hear what people think before I take this further. Best, Chang Chen
