Repository: incubator-beam
Updated Branches:
  refs/heads/master afa0c31bd -> bfc527d63


Changes in AvroCoder serialization so it can serialize in Kryo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06c18468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06c18468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06c18468

Branch: refs/heads/master
Commit: 06c1846860176cc2bd971f8ad7037c97594af866
Parents: afa0c31
Author: Aviem Zur <aviem...@gmail.com>
Authored: Thu Sep 8 11:21:41 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 8 07:47:34 2016 -0800

----------------------------------------------------------------------
 sdks/java/core/pom.xml                          |   7 ++
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 126 +++++++++++--------
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  33 +++++
 3 files changed, 112 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 17ef193..c7b46d8 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -473,5 +473,12 @@
       <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+      <version>2.21</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 7894d14..4f0239e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -164,7 +163,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
   };
 
   private final Class<T> type;
-  private final transient Schema schema;
+  private transient Schema schema;
+
+  private final String schemaStr;
 
   private final List<String> nonDeterministicReasons;
 
@@ -174,36 +175,16 @@ public class AvroCoder<T> extends StandardCoder<T> {
   // Cache the old encoder/decoder and let the factories reuse them when 
possible. To be threadsafe,
   // these are ThreadLocal. This code does not need to be re-entrant as 
AvroCoder does not use
   // an inner coder.
-  private final transient ThreadLocal<BinaryDecoder> decoder;
-  private final transient ThreadLocal<BinaryEncoder> encoder;
-  private final transient ThreadLocal<DatumWriter<T>> writer;
-  private final transient ThreadLocal<DatumReader<T>> reader;
+  private transient ThreadLocal<BinaryDecoder> memoizedDecoder;
+  private transient ThreadLocal<BinaryEncoder> memoizedEncoder;
+  private transient ThreadLocal<DatumWriter<T>> memoizedWriter;
+  private transient ThreadLocal<DatumReader<T>> memoizedReader;
 
   protected AvroCoder(Class<T> type, Schema schema) {
     this.type = type;
     this.schema = schema;
-
+    this.schemaStr = schema.toString();
     nonDeterministicReasons = new 
AvroDeterminismChecker().check(TypeDescriptor.of(type), schema);
-
-    // Decoder and Encoder start off null for each thread. They are allocated 
and potentially
-    // reused inside encode/decode.
-    this.decoder = new ThreadLocal<>();
-    this.encoder = new ThreadLocal<>();
-
-    // Reader and writer are allocated once per thread and are "final" for 
thread-local Coder
-    // instance.
-    this.reader = new ThreadLocal<DatumReader<T>>() {
-      @Override
-      public DatumReader<T> initialValue() {
-        return createDatumReader();
-      }
-    };
-    this.writer = new ThreadLocal<DatumWriter<T>>() {
-      @Override
-      public DatumWriter<T> initialValue() {
-        return createDatumWriter();
-      }
-    };
   }
 
   /**
@@ -246,33 +227,29 @@ public class AvroCoder<T> extends StandardCoder<T> {
     return type;
   }
 
-  private Object writeReplace() {
-    // When serialized by Java, instances of AvroCoder should be replaced by
-    // a SerializedAvroCoderProxy.
-    return new SerializedAvroCoderProxy<>(type, schema.toString());
-  }
-
   @Override
   public void encode(T value, OutputStream outStream, Context context) throws 
IOException {
     // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to 
reuse it.
+    ThreadLocal<BinaryEncoder> encoder = getEncoder();
     BinaryEncoder encoderInstance = 
ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
     // Save the potentially-new instance for reuse later.
     encoder.set(encoderInstance);
-    writer.get().write(value, encoderInstance);
+    getWriter().get().write(value, encoderInstance);
     // Direct binary encoder does not buffer any data and need not be flushed.
   }
 
   @Override
   public T decode(InputStream inStream, Context context) throws IOException {
     // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to 
reuse it.
+    ThreadLocal<BinaryDecoder> decoder = getDecoder();
     BinaryDecoder decoderInstance = 
DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
     // Save the potentially-new instance for later.
     decoder.set(decoderInstance);
-    return reader.get().read(null, decoderInstance);
+    return getReader().get().read(null, decoderInstance);
   }
 
   @Override
-    public List<? extends Coder<?>> getCoderArguments() {
+  public List<? extends Coder<?>> getCoderArguments() {
     return null;
   }
 
@@ -280,7 +257,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
   public CloudObject asCloudObject() {
     CloudObject result = super.asCloudObject();
     addString(result, "type", type.getName());
-    addString(result, "schema", schema.toString());
+    addString(result, "schema", getSchema().toString());
     return result;
   }
 
@@ -306,9 +283,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
   @Deprecated
   public DatumReader<T> createDatumReader() {
     if (type.equals(GenericRecord.class)) {
-      return new GenericDatumReader<>(schema);
+      return new GenericDatumReader<>(getSchema());
     } else {
-      return new ReflectDatumReader<>(schema);
+      return new ReflectDatumReader<>(getSchema());
     }
   }
 
@@ -321,9 +298,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
   @Deprecated
   public DatumWriter<T> createDatumWriter() {
     if (type.equals(GenericRecord.class)) {
-      return new GenericDatumWriter<>(schema);
+      return new GenericDatumWriter<>(getSchema());
     } else {
-      return new ReflectDatumWriter<>(schema);
+      return new ReflectDatumWriter<>(getSchema());
     }
   }
 
@@ -331,28 +308,69 @@ public class AvroCoder<T> extends StandardCoder<T> {
    * Returns the schema used by this coder.
    */
   public Schema getSchema() {
-    return schema;
+    return getMemoizedSchema();
+  }
+
+  /**
+   * Get the memoized {@link BinaryDecoder}, possibly initializing it lazily.
+   */
+  private ThreadLocal<BinaryDecoder> getDecoder() {
+    if (memoizedDecoder == null) {
+      memoizedDecoder = new ThreadLocal<>();
+    }
+    return memoizedDecoder;
+  }
+
+  /**
+   * Get the memoized {@link BinaryEncoder}, possibly initializing it lazily.
+   */
+  private ThreadLocal<BinaryEncoder> getEncoder() {
+    if (memoizedEncoder == null) {
+      memoizedEncoder = new ThreadLocal<>();
+    }
+    return memoizedEncoder;
   }
 
   /**
-   * Proxy to use in place of serializing the {@link AvroCoder}. This allows 
the fields
-   * to remain final.
+   * Get the memoized {@link DatumReader}, possibly initializing it lazily.
    */
-  private static class SerializedAvroCoderProxy<T> implements Serializable {
-    private final Class<T> type;
-    private final String schemaStr;
+  private ThreadLocal<DatumReader<T>> getReader() {
+    if (memoizedReader == null) {
+      memoizedReader = new ThreadLocal<DatumReader<T>>() {
+        @Override
+        public DatumReader<T> initialValue() {
+          return createDatumReader();
+        }
+      };
+    }
+    return memoizedReader;
+  }
 
-    public SerializedAvroCoderProxy(Class<T> type, String schemaStr) {
-      this.type = type;
-      this.schemaStr = schemaStr;
+  /**
+   * Get the memoized {@link DatumWriter}, possibly initializing it lazily.
+   */
+  private ThreadLocal<DatumWriter<T>> getWriter() {
+    if (memoizedWriter == null) {
+      memoizedWriter = new ThreadLocal<DatumWriter<T>>() {
+        @Override
+        public DatumWriter<T> initialValue() {
+          return createDatumWriter();
+        }
+      };
     }
+    return memoizedWriter;
+  }
 
-    private Object readResolve() {
-      // When deserialized, instances of this object should be replaced by
-      // constructing an AvroCoder.
+  /**
+   * Get the {@link Schema}, possibly initializing it lazily by parsing {@link
+   * AvroCoder#schemaStr}.
+   */
+  private Schema getMemoizedSchema() {
+    if (schema == null) {
       Schema.Parser parser = new Schema.Parser();
-      return new AvroCoder<T>(type, parser.parse(schemaStr));
+      schema = parser.parse(schemaStr);
     }
+    return schema;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index f6329a0..f2373d1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -39,6 +39,10 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -172,6 +176,35 @@ public class AvroCoderTest {
     CoderProperties.coderDecodeEncodeEqual(copied, value);
   }
 
+  /**
+   * Confirm that we can serialize and deserialize an AvroCoder object using 
Kryo.
+   * (BEAM-626).
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testKryoSerialization() throws Exception {
+    Pojo value = new Pojo("Hello", 42);
+    AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
+
+    //Kryo instantiation
+    Kryo kryo = new Kryo();
+    kryo.setInstantiatorStrategy(new 
org.objenesis.strategy.StdInstantiatorStrategy());
+
+    //Serialization of object
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    Output output = new Output(bos);
+    kryo.writeObject(output, coder);
+    output.close();
+
+    //De-serialization of object
+    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+    Input input = new Input(bis);
+    AvroCoder<Pojo> copied = (AvroCoder<Pojo>) kryo.readObject(input, 
AvroCoder.class);
+
+    CoderProperties.coderDecodeEncodeEqual(copied, value);
+  }
+
   @Test
   public void testPojoEncoding() throws Exception {
     Pojo value = new Pojo("Hello", 42);

Reply via email to