[ 
https://issues.apache.org/jira/browse/AVRO-1821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nandor Kollar updated AVRO-1821:
--------------------------------
    Fix Version/s: 1.8.1

> Avro (Java) Memory Leak in ReflectData Caching
> ----------------------------------------------
>
>                 Key: AVRO-1821
>                 URL: https://issues.apache.org/jira/browse/AVRO-1821
>             Project: Avro
>          Issue Type: Bug
>          Components: java
>         Environment: OS X 10.11.3
> {code}java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode){code}
>            Reporter: Bryan Harclerode
>            Assignee: Bryan Harclerode
>             Fix For: 1.8.1
>
>         Attachments: 
> 0001-AVRO-1821-Fix-memory-leak-of-Schemas-in-ReflectData.patch
>
>
> I think I have encountered one of the memory leaks described by AVRO-1283 in 
> the way Java Avro implements field accessor caching in {{ReflectData}}. When 
> a reflected object is serialized, the key of {{ClassAccessorData.bySchema}} 
> (as retained by {{ReflectData.ACCESSOR_CACHE}}) retains a strong reference to 
> the schema that was used to serialize the object, but there exists no code 
> path for clearing these references after a schema will no longer be used.
> While in most cases, a class will probably only have one schema associated 
> with it (created and cached by {{ReflectData.getSchema(Type)}}), I 
> experienced {{OutOfMemoryError}} when serializing generic classes with 
> dynamically-generated schemas. The following is a minimal example which will 
> exhaust a 50MiB heap ({{-Xmx50m}}) after about 190K iterations:
> {code:title=AvroMemoryLeakMinimal.java|borderStyle=solid}
> import java.io.ByteArrayOutputStream;
> import java.io.IOException;
> import java.util.Collections;
> import org.apache.avro.Schema;
> import org.apache.avro.io.BinaryEncoder;
> import org.apache.avro.io.EncoderFactory;
> import org.apache.avro.reflect.ReflectDatumWriter;
> public class AvroMemoryLeakMinimal {
>     public static void main(String[] args) throws IOException {
>         long count = 0;
>         EncoderFactory encFactory = EncoderFactory.get();
>         try {
>             while (true) {
>                 // Create schema
>                 Schema schema = Schema.createRecord("schema", null, null, 
> false);
>                 schema.setFields(Collections.<Schema.Field>emptyList());
>                 // serialize
>                 ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
>                 BinaryEncoder encoder = encFactory.binaryEncoder(baos, null);
>                 (new ReflectDatumWriter<Object>(schema)).write(new Object(), 
> encoder);
>                 byte[] result = baos.toByteArray();
>                 count++;
>             }
>         } catch (OutOfMemoryError e) {
>             System.out.print("Memory exhausted after ");
>             System.out.print(count);
>             System.out.println(" schemas");
>             throw e;
>         }
>     }
> }
> {code}
> I was able to fix the bug in the latest 1.9.0-SNAPSHOT from git with the 
> following patch to {{ClassAccessorData.bySchema}} to use weak keys so that it 
> properly released the {{Schema}} objects if no other threads are still 
> referencing them:
> {code:title=ReflectData.java.patch|borderStyle=solid}
> --- a/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
> +++ b/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
> @@ -57,6 +57,7 @@ import org.apache.avro.io.DatumWriter;
>  import org.apache.avro.specific.FixedSize;
>  import org.apache.avro.specific.SpecificData;
>  import org.apache.avro.SchemaNormalization;
> +import org.apache.avro.util.WeakIdentityHashMap;
>  import org.codehaus.jackson.JsonNode;
>  import org.codehaus.jackson.node.NullNode;
>  
> @@ -234,8 +235,8 @@ public class ReflectData extends SpecificData {
>      private final Class<?> clazz;
>      private final Map<String, FieldAccessor> byName =
>          new HashMap<String, FieldAccessor>();
> -    private final IdentityHashMap<Schema, FieldAccessor[]> bySchema =
> -        new IdentityHashMap<Schema, FieldAccessor[]>();
> +    private final WeakIdentityHashMap<Schema, FieldAccessor[]> bySchema =
> +        new WeakIdentityHashMap<Schema, FieldAccessor[]>();
>          
>      private ClassAccessorData(Class<?> c) {
>        clazz = c;
> {code}
> Additionally, I'm not sure why an {{IdentityHashMap}} was used instead of a 
> standard {{HashMap}}, since two equivalent schemas have the same set of 
> {{FieldAccessor}}. Everything appears to work and all tests pass if I use a 
> {{WeakHashMap}} instead of an {{WeakIdentityHashMap}}, but I don't know if 
> there was some other reason object identity was important for this map. If a 
> non-identity map can be used, this will help reduce memory/CPU usage further 
> by not regenerating all the field accessors for equivalent schemas.
> The following unit test appears to reliably catch this bug, but is 
> non-deterministic due to the nature of garbage collection (and I'm not sure 
> there's a way around that):
> {code:title=TestReflectData.java|borderStyle=solid}
> package org.apache.avro.reflect;
> import org.apache.avro.Schema;
> import org.junit.Test;
> import java.io.IOException;
> import java.lang.reflect.Field;
> import java.util.Collections;
> import java.util.Map;
> import static org.hamcrest.Matchers.lessThan;
> import static org.junit.Assert.assertThat;
> public class TestReflectData {
>     /**
>      * Test if ReflectData is leaking {@link Schema} references 
>      */
>     @SuppressWarnings("unchecked")
>     @Test public void testWeakSchemaCaching() throws IOException, 
> NoSuchFieldException, IllegalAccessException {
>         for (int i = 0; i < 1000; i++) {
>             // Create schema
>             Schema schema = Schema.createRecord("schema", null, null, false);
>             schema.setFields(Collections.<Schema.Field>emptyList());
>             ReflectData.get().getRecordState(new Object(), schema);
>         }
>         // Reflect the number of schemas currently in the cache
>         Field cacheField = 
> ReflectData.class.getDeclaredField("ACCESSOR_CACHE");
>         cacheField.setAccessible(true);
>         Map<Class<?>, ?> ACCESSOR_CACHE = (Map) cacheField.get(null);
>         Object classData = ACCESSOR_CACHE.get(Object.class);
>         Field bySchemaField = 
> classData.getClass().getDeclaredField("bySchema");
>         bySchemaField.setAccessible(true);
>         Map<Schema, FieldAccessor[]> accessors = (Map) 
> bySchemaField.get(classData);
>         System.gc(); // Not guaranteed reliable, but seems to be reliable 
> enough for our purposes
>         // See if the number of schemas in the cache is less than the number 
> we generated - if so, then they are being released.
>         assertThat("ReflectData cache should release references", 
> accessors.size(), lessThan(1000));
>     }
> }
> {code}
> (Added {{org.hamcrest:hamcrest-all}} dependency to test scope for the 
> built-in {{lessThan()}} matcher)
> ----
> The current workaround that I'm using to mitigate the leak is to cache 
> schemas and re-use older instances when I'm about to serialize an equivalent 
> schema. Since most of the generated schemas are equivalent, this limits the 
> number of leaked schemas to a handful. A more permanent workaround would be 
> to switch to using a {{GenericRecord}} instead of a generic java class for 
> the object that is being serialized, since this cuts out the use of 
> {{ReflectData}} entirely.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to