Repository: beam
Updated Branches:
  refs/heads/master 9db5f746a -> daed01a69


Use a ThreadLocal to for Marshaller/Unmarshaller in JAXBCoder

This allows reuse of thread-unsafe marshallers and unmarshallers while
encoding elements, while the coder remains thread-safe.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf0b990b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf0b990b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf0b990b

Branch: refs/heads/master
Commit: cf0b990b0336f46b4d4775c93e86bd2310d622b5
Parents: e1ee05e
Author: Kai Jiang <[email protected]>
Authored: Thu Jan 19 05:06:13 2017 -0800
Committer: Thomas Groh <[email protected]>
Committed: Mon Jan 23 11:46:48 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/JAXBCoder.java   | 36 +++++++++++++++-----
 1 file changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cf0b990b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
index 0a4f9cc..ea636fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
@@ -30,6 +30,7 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
 import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
 import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -45,6 +46,8 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   private final Class<T> jaxbClass;
   private final TypeDescriptor<T> typeDescriptor;
   private transient volatile JAXBContext jaxbContext;
+  private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller;
+  private final EmptyOnDeserializationThreadLocal<Unmarshaller> 
jaxbUnmarshaller;
 
   public Class<T> getJAXBClass() {
     return jaxbClass;
@@ -53,6 +56,28 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   private JAXBCoder(Class<T> jaxbClass) {
     this.jaxbClass = jaxbClass;
     this.typeDescriptor = TypeDescriptor.of(jaxbClass);
+    this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() {
+      @Override
+      protected Marshaller initialValue() {
+        try {
+          JAXBContext jaxbContext = getContext();
+          return jaxbContext.createMarshaller();
+        } catch (JAXBException e) {
+          throw new RuntimeException("Error when creating marshaller from JAXB 
Context.", e);
+        }
+      }
+    };
+    this.jaxbUnmarshaller = new 
EmptyOnDeserializationThreadLocal<Unmarshaller>() {
+      @Override
+      protected Unmarshaller initialValue() {
+        try {
+          JAXBContext jaxbContext = getContext();
+          return jaxbContext.createUnmarshaller();
+        } catch (Exception e) {
+          throw new RuntimeException("Error when creating unmarshaller from 
JAXB Context.", e);
+        }
+      }
+    };
   }
 
   /**
@@ -68,9 +93,6 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   public void encode(T value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     try {
-      JAXBContext jaxbContext = getContext();
-      // TODO: Consider caching in a ThreadLocal if this impacts performance
-      Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
       if (!context.isWholeStream) {
         try {
           long size = getEncodedElementByteSize(value, Context.OUTER);
@@ -83,7 +105,7 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
         }
       }
 
-      jaxbMarshaller.marshal(value, new CloseIgnoringOutputStream(outStream));
+      jaxbMarshaller.get().marshal(value, new 
CloseIgnoringOutputStream(outStream));
     } catch (JAXBException e) {
       throw new CoderException(e);
     }
@@ -92,17 +114,13 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   @Override
   public T decode(InputStream inStream, Context context) throws 
CoderException, IOException {
     try {
-      JAXBContext jaxbContext = getContext();
-      // TODO: Consider caching in a ThreadLocal if this impacts performance
-      Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-
       InputStream stream = inStream;
       if (!context.isWholeStream) {
         long limit = VarInt.decodeLong(inStream);
         stream = ByteStreams.limit(inStream, limit);
       }
       @SuppressWarnings("unchecked")
-      T obj = (T) jaxbUnmarshaller.unmarshal(new 
CloseIgnoringInputStream(stream));
+      T obj = (T) jaxbUnmarshaller.get().unmarshal(new 
CloseIgnoringInputStream(stream));
       return obj;
     } catch (JAXBException e) {
       throw new CoderException(e);

Reply via email to