[ 
https://issues.apache.org/jira/browse/AVRO-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18078387#comment-18078387
 ] 

Michael Skells commented on AVRO-4253:
--------------------------------------

This seems to fix it - but at a cost!  
```
private RecordReader getRecordReaderFromCache(Schema readerSchema, Schema 
writerSchema) {
    if (writerSchema == readerSchema) {
      //we need to deep clone the schema - or there is a memory leak - 
https://issues.apache.org/jira/browse/AVRO-4253
      readerSchema = new Schema.Parser().parse(readerSchema.toString());
    }

    return readerCache.computeIfAbsent(readerSchema, k -> new 
WeakIdentityHashMap<>()).computeIfAbsent(writerSchema,
        k -> new RecordReader());
  }
```

> unbounded memory leak in FastReaderBuilder
> ------------------------------------------
>
>                 Key: AVRO-4253
>                 URL: https://issues.apache.org/jira/browse/AVRO-4253
>             Project: Apache Avro
>          Issue Type: Bug
>          Components: java
>    Affects Versions: 1.12.1
>            Reporter: Michael Skells
>            Priority: Critical
>              Labels: performance
>         Attachments: FastReaderBuilderCacheRetentionReproducer.java
>
>
> # FastReaderBuilder WeakIdentityHashMap Cache Retention Bug
>  
>  ## Summary
>  
>  FastReaderBuilder's internal readerCache uses WeakIdentityHashMap to allow 
> cached RecordReader entries to be garbage-collected when their Schema keys 
> become unreachable. However, each RecordReader value holds a {*}strong 
> reference to a Schema{*}, so if the reader and writer scheam are the same 
> this, prevents the weak reference from ever being cleared. 
>  The intended eviction behavior of WeakIdentityHashMap never occurs. Entries 
> accumulate for the lifetime of the FastReaderBuilder (and its owning 
> GenericData instance, which by default is a global static value).
>  
>  ## Affected Version
>  
>  Apache Avro 1.12.1 (and likely all versions since FastReaderBuilder was 
> introduced).
>  
>  ## Root Cause
>  
>  ### Cache structure
>  
>  java
>  // FastReaderBuilder.java
>  private final Map<Schema, Map<Schema, RecordReader>> readerCache = 
> Collections
>      .synchronizedMap(new WeakIdentityHashMap<>());
>  
>  
>  The outer map keys are *reader schemas* (weak references). The inner map 
> keys are *writer schemas* (also weak references). Values are RecordReader 
> instances.
>  
>  ### RecordReader retains the key
>  
>  java
>  public static class RecordReader implements FieldReader {
>      private ExecutionStep[] readSteps;  // strong references to child 
> RecordReaders
>      private Schema schema;              // ← STRONG reference to the reader 
> Schema (the outer key)
>      private InstanceSupplier supplier;  // also references the Schema
>      // ...
>  
>      public void finishInitialization(ExecutionStep[] readSteps, Schema 
> schema, InstanceSupplier supp) {
>          this.readSteps = readSteps;
>          this.schema = schema;          // ← stores the same Schema used as 
> the weak key
>          this.supplier = supp;
>          this.stage = Stage.INITIALIZED;
>      }
>  }
>  
>  
>  The same RecordReader object is both returned to the caller AND stored in 
> the cache (via computeIfAbsent). When the caller discards the DatumReader 
> after reading, the cache still holds a strong reference to the RecordReader. 
> And RecordReader.schema holds a strong reference back to the Schema used as 
> the weak key.
>  
>  This is a classic *weak-key cache antipattern: value references key*. The 
> WeakIdentityHashMap can never clear the weak key because the cache's own 
> value keeps the key strongly reachable:
>  
>  
>  FastReaderBuilder (application lifetime)
>    └─ readerCache (WeakIdentityHashMap)
>         └─ entry: weak(Schema_A) → inner map
>              └─ entry: weak(Schema_A) → RecordReader  ← cache holds strong 
> ref to value
>                   └─ schema → Schema_A                 ← value holds strong 
> ref back to key
>  
>  
>  No external reference to Schema_A is needed to keep it alive — the cache is 
> self-sustaining.
>  
>  ### Transitive retention for nested schemas
>  
>  For a record schema with nested sub-records, initializeRecordReader() 
> recursively calls createRecordReader() for each child RECORD field. Each 
> child RecordReader is captured inside an ExecutionStep lambda (via 
> createFieldSetter()), and each child holds its own schema strong reference.
>  
>  For a schema with N nesting levels:
>  
>  
>  readerCache:
>    weak→ Level_0_Schema → Map:
>      weak→ Level_0_Writer → RecordReader_0
>        ├─ schema → Level_0_Schema      ← strong ref pinning the outer weak key
>        └─ readSteps[child_field] → ExecutionStep (lambda)
>             └─ captures RecordReader_1
>                  ├─ schema → Level_1_Schema  ← pinning Level_1's weak key
>                  └─ readSteps[child_field] → ExecutionStep (lambda)
>                       └─ captures RecordReader_2
>                            └─ ... (N levels deep)
>  
>  
>  The entire N-level chain is pinned. WeakIdentityHashMap.reap() will never 
> remove any of these entries.
>  
>  ### Impact on WeakIdentityHashMap.reap()
>  
>  reap() polls the ReferenceQueue for cleared weak references:
>  
>  java
>  private synchronized void reap() {
>      Object zombie = queue.poll();
>      while (zombie != null) {
>          IdentityWeakReference victim = (IdentityWeakReference) zombie;
>          backingStore.remove(victim);
>          zombie = queue.poll();
>      }
>  }
>  
>  
>  Since RecordReader.schema keeps the key Schema strongly reachable, the 
> IdentityWeakReference is never enqueued → reap() never removes entries → 
> cache grows monotonically.
>  
>  ## Reproduction
>  
>  See the attached standalone Java file 
> FastReaderBuilderCacheRetentionReproducer.java which:
>  
>  1. Creates a 5-level nested schema (100 fields total)
>  2. Calls FastReaderBuilder.createDatumReader(schema) 10 times with 
> freshly-parsed Schema objects (different identity, same structure — 
> simulating independent file header parsing)
>  3. Releases all Schema references and forces GC
>  4. Observes that both outer and inner cache entries are retained (schemas 
> are NOT collected)
>  
>  *Key detail:* The single-arg createDatumReader(schema) uses the same Schema 
> identity for both writer and reader cache keys. RecordReader.schema then 
> holds a strong reference to that same object, creating a cycle that prevents 
> either weak key from being cleared.
>  
>  When writer and reader are different Schema identities (as happens when 
> DataFileStream parses its own header schema), the writer schema can be GC'd 
> independently, breaking the chain. This reproducer uses the same-identity 
> path to demonstrate the retention.
>  
>  Compile and run:
>  
>  bash
>  javac -cp avro-1.12.1.jar FastReaderBuilderCacheRetentionReproducer.java
>  java -cp .:avro-1.12.1.jar FastReaderBuilderCacheRetentionReproducer
>  
>  
>  Expected output:
>  
>  
>  After schema 1: outer=5 inner=5
>  After schema 2: outer=10 inner=10
>  ...
>  After schema 10: outer=50 inner=50
>  
>  --- Forcing GC (5 cycles with reap) ---
>  
>  After GC:
>    Outer cache entries (reader schemas): 50
>    Inner cache entries (writer schemas):    50
>    Schema WeakRefs cleared by GC:           0 / 10
>  
>    Expected if cache eviction works:
>      outer=0, inner=0, schemas collected=10
>    Expected with retention bug:
>      outer=10, inner=50, schemas collected=0
>  
>  BUG CONFIRMED: Cache retained entries after GC.
>    RecordReader.schema holds strong refs to Schema keys,
>    preventing WeakIdentityHashMap from clearing entries.
>    Schema objects are NOT being collected (pinned by cache values).
>  
>  ## Suggested Fix
> If the 2 schemas are the same object then deep clone the reader schema. 
> It cant be a shallow clone as this would only free one level of the cache, 
> unless we make the cache removal remove the whole tree (currently the tree 
> gets removed by the weak reference).
> A Shalow clone would mean the you need n cycles of GC, finalisation, map 
> access to clear the cache
> Maybe rework to remove the schema reference or make it weak (I dont know if 
> that would work though. Probable too hard to consider)
>  ## Workarounds
>  
>  Applications can mitigate this by:
>  
>  1. *Schema interning* — ensure all Schema objects passed to 
> FastReaderBuilder are canonical (same identity for structurally equal 
> schemas). This makes the cache bounded to the number of distinct schemas, 
> which is typically small. The entries are never evicted but the memory is 
> bounded.
>  
>  2. *Per-task GenericData instances* — create a new GenericData (and thus new 
> FastReaderBuilder) for each batch of work, and discard it when done. The 
> entire cache is collected when the GenericData becomes unreachable.
>  
>  3. *Disable FastReader* — set GenericData.setFastReaderEnabled(false) and 
> rely on the standard ResolvingDecoder path, which uses a separate 
> ThreadLocal<WeakIdentityHashMap> cache (same retention issue but scoped to 
> thread lifetime).
> (1) is accomplisted by https://issues.apache.org/jira/browse/AVRO-4249 / 
> https://github.com/apache/avro/pull/3746
> What I did to fully mitigate was the above, together with (2) - periodic 
> recycling of the GenericData (and therefore its FastDataBuilder)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to