Repository: beam Updated Branches: refs/heads/master 535761a74 -> 8a1fab1d1
Use a new ReflectData for each AvroCoder instance This addresses an issue where Avro might have cached a class from a different ClassLoader. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e335081 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e335081 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e335081 Branch: refs/heads/master Commit: 7e33508187f49c359ce585ae173f3aee5658eb35 Parents: 535761a Author: Kenneth Knowles <[email protected]> Authored: Sat Apr 29 14:06:37 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Mon May 1 09:35:17 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/coders/AvroCoder.java | 72 ++++++++++++++------ .../apache/beam/sdk/coders/AvroCoderTest.java | 65 ++++++++++++++++++ .../beam/sdk/coders/AvroCoderTestPojo.java | 51 ++++++++++++++ 3 files changed, 167 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7e335081/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 1d7cce5..1e01f1a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.coders; import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -114,7 +115,7 @@ public class AvroCoder<T> extends CustomCoder<T> { * @param <T> the element type */ public static <T> AvroCoder<T> of(Class<T> clazz) { - return new AvroCoder<>(clazz, ReflectData.get().getSchema(clazz)); + return new AvroCoder<>(clazz, new ReflectData(clazz.getClassLoader()).getSchema(clazz)); } /** @@ -198,6 +199,25 @@ public class AvroCoder<T> extends CustomCoder<T> { } } + /** + * A {@link Serializable} object that lazily supplies a {@link ReflectData} built from the + * appropriate {@link ClassLoader} for the type encoded by this {@link AvroCoder}. + */ + private static class SerializableReflectDataSupplier + implements Serializable, Supplier<ReflectData> { + + private final Class<?> clazz; + + private SerializableReflectDataSupplier(Class<?> clazz) { + this.clazz = clazz; + } + + @Override + public ReflectData get() { + return new ReflectData(clazz.getClassLoader()); + } + } + // Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe, // these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use // an inner coder. @@ -206,6 +226,9 @@ public class AvroCoder<T> extends CustomCoder<T> { private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer; private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader; + // Lazily re-instantiated after deserialization + private final Supplier<ReflectData> reflectData; + protected AvroCoder(Class<T> type, Schema schema) { this.type = type; this.schemaSupplier = new SerializableSchemaSupplier(schema); @@ -217,26 +240,33 @@ public class AvroCoder<T> extends CustomCoder<T> { this.decoder = new EmptyOnDeserializationThreadLocal<>(); this.encoder = new EmptyOnDeserializationThreadLocal<>(); - // Reader and writer are allocated once per thread and are "final" for thread-local Coder - // instance. - this.reader = new EmptyOnDeserializationThreadLocal<DatumReader<T>>() { - private final AvroCoder<T> myCoder = AvroCoder.this; - @Override - public DatumReader<T> initialValue() { - return myCoder.getType().equals(GenericRecord.class) - ? new GenericDatumReader<T>(myCoder.getSchema()) - : new ReflectDatumReader<T>(myCoder.getSchema()); - } - }; - this.writer = new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() { - private final AvroCoder<T> myCoder = AvroCoder.this; - @Override - public DatumWriter<T> initialValue() { - return myCoder.getType().equals(GenericRecord.class) - ? new GenericDatumWriter<T>(myCoder.getSchema()) - : new ReflectDatumWriter<T>(myCoder.getSchema()); - } - }; + this.reflectData = Suppliers.memoize(new SerializableReflectDataSupplier(getType())); + + // Reader and writer are allocated once per thread per Coder + this.reader = + new EmptyOnDeserializationThreadLocal<DatumReader<T>>() { + private final AvroCoder<T> myCoder = AvroCoder.this; + + @Override + public DatumReader<T> initialValue() { + return myCoder.getType().equals(GenericRecord.class) + ? new GenericDatumReader<T>(myCoder.getSchema()) + : new ReflectDatumReader<T>( + myCoder.getSchema(), myCoder.getSchema(), myCoder.reflectData.get()); + } + }; + + this.writer = + new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() { + private final AvroCoder<T> myCoder = AvroCoder.this; + + @Override + public DatumWriter<T> initialValue() { + return myCoder.getType().equals(GenericRecord.class) + ? new GenericDatumWriter<T>(myCoder.getSchema()) + : new ReflectDatumWriter<T>(myCoder.getSchema(), myCoder.reflectData.get()); + } + }; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/7e335081/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index cbc4d24..e1d5359 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -26,8 +26,10 @@ import static org.junit.Assert.fail; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.google.common.io.ByteStreams; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; @@ -64,6 +66,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -159,8 +163,69 @@ public class AvroCoderTest { CoderProperties.coderConsistentWithEquals(copy, pojo, equalPojo); CoderProperties.coderConsistentWithEquals(coder, pojo, otherPojo); CoderProperties.coderConsistentWithEquals(copy, pojo, otherPojo); + } + + /** + * A classloader that intercepts loading of Pojo and makes a new one. + */ + private static class InterceptingUrlClassLoader extends ClassLoader { + @Override + public Class<?> loadClass(String name) throws ClassNotFoundException { + if (name.equals(AvroCoderTestPojo.class.getName())) { + // Quite a hack? + try { + String classAsResource = name.replace('.', '/') + ".class"; + byte[] classBytes = + ByteStreams.toByteArray(getParent().getResourceAsStream(classAsResource)); + return defineClass(name, classBytes, 0, classBytes.length); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + return getParent().loadClass(name); + } + } + } + + /** + * Tests that {@link AvroCoder} works around issues in Avro where cache classes might be + * from the wrong ClassLoader, causing confusing "Cannot cast X to X" error messages. + */ + @Test + public void testTwoClassLoaders() throws Exception { + ClassLoader loader1 = new InterceptingUrlClassLoader(); + ClassLoader loader2 = new InterceptingUrlClassLoader(); + + Class<?> pojoClass1 = loader1.loadClass(AvroCoderTestPojo.class.getName()); + Class<?> pojoClass2 = loader2.loadClass(AvroCoderTestPojo.class.getName()); + + Object pojo1 = InstanceBuilder.ofType(pojoClass1).withArg(String.class, "hello").build(); + Object pojo2 = InstanceBuilder.ofType(pojoClass2).withArg(String.class, "goodbye").build(); + + // Confirm incompatibility + try { + pojoClass2.cast(pojo1); + fail("Expected ClassCastException; without it, this test is vacuous"); + } catch (ClassCastException e) { + // g2g + } + + // The first coder is expected to populate the Avro SpecificData cache + // The second coder is expected to be corrupted if the caching is done wrong. + AvroCoder<Object> avroCoder1 = (AvroCoder) AvroCoder.of(pojoClass1); + AvroCoder<Object> avroCoder2 = (AvroCoder) AvroCoder.of(pojoClass2); + + Object cloned1 = CoderUtils.clone(avroCoder1, pojo1); + Object cloned2 = CoderUtils.clone(avroCoder2, pojo2); + + Class<?> class1 = cloned1.getClass(); + Class<?> class2 = cloned2.getClass(); + // Confirming that the uncorrupted coder is fine + pojoClass1.cast(cloned1); + // Confirmed to fail prior to the fix + pojoClass2.cast(cloned2); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/7e335081/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTestPojo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTestPojo.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTestPojo.java new file mode 100644 index 0000000..dd5d419 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTestPojo.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import com.google.common.base.MoreObjects; +import java.util.Objects; + +/** A Pojo at the top level for use in tests. */ +class AvroCoderTestPojo { + + public String text; + + // Empty constructor required for Avro decoding. + @SuppressWarnings("unused") + public AvroCoderTestPojo() { + } + + public AvroCoderTestPojo(String text) { + this.text = text; + } + + @Override + public boolean equals(Object other) { + return (other instanceof AvroCoderTestPojo) && ((AvroCoderTestPojo) other).text.equals(text); + } + + @Override + public int hashCode() { + return Objects.hash(AvroCoderTestPojo.class, text); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("text", text).toString(); + } +}
