Repository: beam Updated Branches: refs/heads/master 9db5f746a -> daed01a69
Use a ThreadLocal to for Marshaller/Unmarshaller in JAXBCoder This allows reuse of thread-unsafe marshallers and unmarshallers while encoding elements, while the coder remains thread-safe. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf0b990b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf0b990b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf0b990b Branch: refs/heads/master Commit: cf0b990b0336f46b4d4775c93e86bd2310d622b5 Parents: e1ee05e Author: Kai Jiang <[email protected]> Authored: Thu Jan 19 05:06:13 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Mon Jan 23 11:46:48 2017 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/coders/JAXBCoder.java | 36 +++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cf0b990b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java index 0a4f9cc..ea636fc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java @@ -30,6 +30,7 @@ import javax.xml.bind.JAXBException; import javax.xml.bind.Marshaller; import javax.xml.bind.Unmarshaller; import org.apache.beam.sdk.util.CloudObject; +import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal; import org.apache.beam.sdk.util.Structs; import org.apache.beam.sdk.util.VarInt; import org.apache.beam.sdk.values.TypeDescriptor; @@ -45,6 +46,8 @@ public class JAXBCoder<T> extends AtomicCoder<T> { private final Class<T> jaxbClass; private final TypeDescriptor<T> typeDescriptor; private transient volatile JAXBContext jaxbContext; + private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller; + private final EmptyOnDeserializationThreadLocal<Unmarshaller> jaxbUnmarshaller; public Class<T> getJAXBClass() { return jaxbClass; @@ -53,6 +56,28 @@ public class JAXBCoder<T> extends AtomicCoder<T> { private JAXBCoder(Class<T> jaxbClass) { this.jaxbClass = jaxbClass; this.typeDescriptor = TypeDescriptor.of(jaxbClass); + this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() { + @Override + protected Marshaller initialValue() { + try { + JAXBContext jaxbContext = getContext(); + return jaxbContext.createMarshaller(); + } catch (JAXBException e) { + throw new RuntimeException("Error when creating marshaller from JAXB Context.", e); + } + } + }; + this.jaxbUnmarshaller = new EmptyOnDeserializationThreadLocal<Unmarshaller>() { + @Override + protected Unmarshaller initialValue() { + try { + JAXBContext jaxbContext = getContext(); + return jaxbContext.createUnmarshaller(); + } catch (Exception e) { + throw new RuntimeException("Error when creating unmarshaller from JAXB Context.", e); + } + } + }; } /** @@ -68,9 +93,6 @@ public class JAXBCoder<T> extends AtomicCoder<T> { public void encode(T value, OutputStream outStream, Context context) throws CoderException, IOException { try { - JAXBContext jaxbContext = getContext(); - // TODO: Consider caching in a ThreadLocal if this impacts performance - Marshaller jaxbMarshaller = jaxbContext.createMarshaller(); if (!context.isWholeStream) { try { long size = getEncodedElementByteSize(value, Context.OUTER); @@ -83,7 +105,7 @@ public class JAXBCoder<T> extends AtomicCoder<T> { } } - jaxbMarshaller.marshal(value, new CloseIgnoringOutputStream(outStream)); + jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream)); } catch (JAXBException e) { throw new CoderException(e); } @@ -92,17 +114,13 @@ public class JAXBCoder<T> extends AtomicCoder<T> { @Override public T decode(InputStream inStream, Context context) throws CoderException, IOException { try { - JAXBContext jaxbContext = getContext(); - // TODO: Consider caching in a ThreadLocal if this impacts performance - Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller(); - InputStream stream = inStream; if (!context.isWholeStream) { long limit = VarInt.decodeLong(inStream); stream = ByteStreams.limit(inStream, limit); } @SuppressWarnings("unchecked") - T obj = (T) jaxbUnmarshaller.unmarshal(new CloseIgnoringInputStream(stream)); + T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(stream)); return obj; } catch (JAXBException e) { throw new CoderException(e);
