Hi Chang,
One concern on Layer 1 — I don't think the "unified LRU" property holds as 
stated.
MemoryStore.entries is an access-order LinkedHashMap. Per the LinkedHashMap 
contract, only get/put/compute/merge refresh a key's position; containsKey and 
iteration do not. Inside MemoryStore, the only methods that refresh position 
are getBytes and getValues, both called from the JVM read path in BlockManager.
The proposal makes those read paths refuse native blocks — the payload is 
opaque, only the owning engine can read it. So a native entry inserted at T0 
has its access-order position fixed at T0. Any subsequent read of any JVM block 
moves that JVM block ahead of it. After a few read cycles, the native entry is 
permanently the oldest in iteration order, and evictBlocksToFreeSpace walks 
iteration order, so the native block gets picked first under any pressure.
That isn't unified LRU; it's "native blocks are always among the oldest 
entries." Fixing it requires exposing something like MemoryStore.touch(blockId) 
that the native side calls on every read — one synchronized lookup per read, 
which is non-trivial cost for a hot Velox/Gluten cache and deserves to be in 
the design conversation explicitly.
Either commit to the touch API and accept its cost, or describe what eviction 
policy native blocks actually get and why it's still acceptable. Both are 
legitimate paths, but the current proposal hangs on a property that doesn't 
follow from the mechanism alone.
Best,
Kent

On 2026/05/20 02:47:16 Chang Chen wrote:
> Hi all,
> 
> I would like to discuss a long-standing pain point for native execution
> engines(Gluten) built on Spark: *the memory held by the native side cannot
> participate in Spark's storage memory accounting as a first-class citizen*.
> I will summarize what has been tried, what's still missing, and a concrete
> mechanism I'd like the community's feedback on.
> 1. The problem
> 
> When a native engine wants to cache columnar payloads next to Spark —
> native vectors, Arrow buffers, native file caches, etc. — the memory lives
> outside the JVM heap and is allocated through a separate allocator. Two
> things go wrong today:
> 
> *(a) The two pools don't share a budget.* The native allocator's budget is
> configured independently from spark.memory.storage /
> spark.memory.offHeap.size. As a result, the user has to statically
> partition memory between Spark and the native engine, and neither side can
> borrow from the other under pressure. Practical outcomes: the native cache
> spills aggressively while Spark storage sits idle, or vice versa.
> 
> *(b) Even when df.persist() is used to cache native results, the payload is
> copied through the JVM.* The columnar cache SPI today (CachedBatchSerializer)
> requires the produced CachedBatch to live as JVM-owned bytes so that
> MemoryStore can hold it as a DeserializedMemoryEntry / SerializedMemoryEntry.
> For any native engine, this means serializing each batch from native memory
> into a JVM byte[] on the write side and deserializing back into native
> memory on the read side. Every cache hit pays the cost of two off-heap <->
> on-heap copies (and any associated cross-boundary call overhead). The point
> of a native engine — staying off-heap and zero-copy — is defeated at the
> cache boundary.
> 2. Prior attempts and why they fell short
> 
> I want to credit two prior efforts that explored adjacent designs:
> 
> *SPARK-48694 <https://issues.apache.org/jira/browse/SPARK-48694> /
> PR #47067 <https://github.com/apache/spark/pull/47067>* introduced an
> ExternalMemoryStore parallel to MemoryStore, sharing the storage pool
> budget, with three new configs (spark.memory.external.cache.enabled,
> spark.memory.external.storageFraction,
> spark.memory.storage.preferEvictExtCache). The eviction policy under
> pressure is a *static global boolean* — always prefer one store over the
> other — rather than unified per-block LRU. It also duplicates a fair amount
> of MemoryStore infrastructure into a parallel surface, and is shaped around
> the "file cache" use case rather than a general external-object lifecycle.
> The PR was eventually closed as stale.
> 
> *SPARK-56918 <https://issues.apache.org/jira/browse/SPARK-56918> /
> PR #55953 <https://github.com/apache/spark/pull/55953> (myself, a few days
> ago)* went a different way: I added a ManagedConsumer SPI to
> UnifiedMemoryManager, by analogy with UnmanagedConsumer. The native side
> registers as a consumer, Spark calls back to shrink it. Two problems
> surfaced in review:
> 
>    1. *The accounting is not symmetric.* Consumers are an execution-memory
>    concept; they are asked to spill under execution pressure. Storage blocks
>    have their own LRU within the storage pool. Bolting a "managed consumer"
>    onto storage means the native side is always the first to be reclaimed
>    under any pressure — there is no fair LRU between native and JVM entries.
>    2. *It does not address the serdes problem.* Consumers register a
>    *budget*; they don't give Spark anything to evict at batch granularity.
>    df.persist() still has to go through the JVM byte path.
> 
> Neither attempt gives the native engine a way to say *"I have this opaque
> payload, please count it against the storage pool and tell me when to
> release it."*
> 3. What I'd like to propose
> 
> The core idea is small: *let BlockManager register and evict blocks whose
> bytes live outside the JVM*, then build the cache SPI on top.
> Layer 1 — a "native block" primitive in core
> 
> A new BlockManager entry point lets a caller register an executor-local
> block with three pieces of information:
> 
>    - a new NativeBlockId subtype, so the block is unambiguously
>    distinguishable from JVM-resident blocks;
>    - the number of bytes to reserve in the storage memory pool (on- or
>    off-heap, the caller's choice);
>    - a release() callback that Spark invokes when the block is evicted or
>    removed.
> 
> If storage pressure forces eviction, the existing
> MemoryStore.evictBlocksToFreeSpace algorithm picks across *all* entries —
> JVM-resident and native — using the same LRU policy. No preference flag, no
> parallel store. When a native entry is chosen, release() is invoked exactly
> once and the reserved bytes return to the pool.
> 
> Read paths (getLocalValues, getLocalBytes, getLocalBlockData, get,
> replicateBlock) refuse native blocks with a clear error: the payload is
> opaque to Spark and must be read through the owning engine's own API. The
> master is never notified, because native payloads are inherently
> executor-local and cannot be fetched over the network.
> 
> So BlockManager becomes a unified registry for "this many bytes of storage
> are committed; call me back when you need them released," regardless of who
> owns the actual bytes.
> Layer 2 — a native variant of the columnar cache SPI
> 
> On top of layer 1, CachedBatchSerializer grows a small opt-in:
> 
>    - a flag indicating that the serializer produces batches whose payload
>    is owned by an external engine;
>    - a NativeCachedBatch trait carrying release() plus a callback that
>    receives the NativeBlockId Spark assigns;
>    - a lightweight handle type (block id + row count + size + optional
>    column stats) that flows through the cached RDD in place of the original
>    payload.
> 
> When the flag is set, the cache builder takes a different fork. For each
> batch produced by the serializer:
> 
>    1. Spark reserves storage memory via the layer-1 entry point under a
>    deterministic id.
>    2. The serializer is informed of the id, so it can populate its own
>    executor-local lookup table mapping id back to the engine payload.
>    3. The Spark-visible CachedBatch is just the handle; the handle RDD is
>    persisted at MEMORY_ONLY (handles are tiny).
>    4. On scan, the serializer's convertCachedBatchTo* methods resolve the
>    handle's block id via the lookup table and hand back the native payload
>    directly.
> 
> Because the handle still carries column statistics in the existing
> SimpleMetricsCachedBatch schema, partition pruning continues to work
> without changes to the cache scan operator.
> 
> The cache-eviction story is now closed-loop: Spark LRU picks the oldest
> entry (JVM or native), MemoryStore calls the registered release(), the
> native engine frees its bytes, and the next scan recomputes the affected
> partitions.
> 4. Why this is more general than the previous attempts
> 
>    - *Unified LRU, not a preference flag.* Native and JVM blocks live in
>    the same entries map; the existing eviction algorithm handles them
>    uniformly. No new tunable for users to misconfigure.
>    - *No new configs.* It reuses
>    spark.memory.{offHeap.enabled,offHeap.size,fraction,storageFraction} 
> exactly
>    as today.
>    - *No MemoryStore clone.* Only a new entry variant alongside the
>    existing two — small surface, same lifecycle hooks.
>    - *Not bound to a single use case.* "Native file cache" and "native
>    columnar cache" are both just consumers of the same core primitive.
>    Anything with an opaque, off-heap payload and a release() semantics fits.
>    - *Solves the serdes problem.* Native engines can opt into the SPI and
>    have df.persist(MEMORY_ONLY) stay zero-copy end to end. The columnar
>    cache SPI stops being a forced serialization boundary.
> 
> 5. Trade-offs to discuss
> 
> A few things worth surfacing:
> 
>    - *No replication, no spill, no remote fetch.* Native blocks are
>    executor-local and opaque to Spark; the SPI explicitly rejects storage
>    levels that request disk or replication > 1. Engines that want spill or
>    replication have to implement it themselves on top.
>    - *Dataset.cache() default is MEMORY_AND_DISK*, which the native path
>    would reject. Users must call persist(MEMORY_ONLY) explicitly. We could
>    relax this with a silent downgrade, or expose a new canonical storage-level
>    constant. I lean toward "be explicit" for the initial cut but happy to hear
>    opinions.
>    - *Cluster-mode unpersist cleanup is best-effort.* Today the cache
>    builder only touches the driver's local BlockManager; remote executors
>    will release their native blocks via LRU or shutdown. A driver -> executor
>    cleanup RPC is a natural follow-up.
> 
> 6. What I have ready
> 
> I have a 3-commit stack on top of master that I'd like to put up for review
> once a JIRA umbrella is filed:
> 
>    1. [CORE] Add native block storage for opaque external-engine payloads —
>    the new BlockId subtype, the MemoryStore variant, the BlockManager entry
>    point, and tests covering reservation, eviction, release-on-evict, and
>    release-throws.
>    2. [CORE] Harden native block storage — type enforcement, duplicate-put
>    detection, replicate/master guards.
>    3. [SQL] Wire the SPI into the columnar cache builder, with end-to-end
>    tests using a fake native serializer that exercises the round trip through
>    df.persist(MEMORY_ONLY) and eviction.
> 
> 7. What I'm asking
> 
>    1. Is the community OK with BlockManager growing a "register external
>    payload" surface? I think this is the right home for it — the alternative
>    is yet another sidecar registry — but I want to confirm before pushing PRs.
>    2. Reviewers familiar with MemoryStore / UnifiedMemoryManager /
>    CachedBatchSerializer who can weigh in on the API shape: memory-mode
>    choice, lock semantics inside release(), the registration callback, and
>    how strict the disk / replication rejection should be.
>    3. Anything I'm missing from a prior discussion that already converged
>    on a different direction.
> 
> Thanks for reading. I'd love to hear what people think before I take this
> further.
> 
> Best, Chang Chen
> 

---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]

Reply via email to