[FLINK-6515] [runtime] Fix classloading of JavaSerializer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc3512ee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc3512ee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc3512ee Branch: refs/heads/release-1.3 Commit: cc3512ee9bbcc965278b43642cc9481f77027c4f Parents: 95fd2d3 Author: Stephan Ewen <[email protected]> Authored: Wed May 10 11:20:07 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed May 10 21:08:37 2017 +0200 ---------------------------------------------------------------------- flink-runtime/pom.xml | 10 +- .../flink/runtime/state/JavaSerializer.java | 17 ++- .../flink/runtime/state/JavaSerializerTest.java | 116 +++++++++++++++++ .../flink/core/testutils/CommonTestUtils.java | 124 +++++++++++++++++++ 4 files changed, 257 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cc3512ee/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index a6b9513..2b42cc5 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -162,7 +162,7 @@ under the License. <version>${project.version}</version> </dependency> - <!-- core dependencies --> + <!-- test dependencies --> <dependency> <groupId>org.apache.flink</groupId> @@ -172,6 +172,14 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>${curator.version}</version> http://git-wip-us.apache.org/repos/asf/flink/blob/cc3512ee/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java index d49b1d2..5252b3d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java @@ -24,12 +24,12 @@ import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.InstantiationUtil; import java.io.IOException; import java.io.Serializable; -@SuppressWarnings("serial") @Internal final class JavaSerializer<T extends Serializable> extends TypeSerializerSingleton<T> { @@ -47,11 +47,10 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet @Override public T copy(T from) { - try { - return InstantiationUtil.clone(from); + return InstantiationUtil.clone(from, Thread.currentThread().getContextClassLoader()); } catch (IOException | ClassNotFoundException e) { - throw new RuntimeException("Could not copy instance of " + from + '.', e); + throw new FlinkRuntimeException("Could not copy element via serialization: " + from, e); } } @@ -62,7 +61,7 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet @Override public int getLength() { - return 0; + return -1; } @Override @@ -74,7 +73,8 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet public T deserialize(DataInputView source) throws IOException { try { return InstantiationUtil.deserializeObject( - new DataInputViewStream(source), Thread.currentThread().getContextClassLoader()); + new DataInputViewStream(source), + Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IOException("Could not deserialize object.", e); } @@ -87,9 +87,8 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - int size = source.readInt(); - target.writeInt(size); - target.write(source, size); + T tmp = deserialize(source); + serialize(tmp, target); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/cc3512ee/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerTest.java new file mode 100644 index 0000000..de6fbce --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.Serializable; +import java.net.URL; +import java.net.URLClassLoader; + +import static org.junit.Assert.*; + +/** + * A test that verifies that the {@link JavaSerializer} properly handles class loading. + */ +public class JavaSerializerTest extends SerializerTestBase<Serializable> { + + /** Class loader for the object that is not in the test class path */ + private static final ClassLoader CLASS_LOADER = + new URLClassLoader(new URL[0], JavaSerializerTest.class.getClassLoader()); + + /** An object that is not in the test class path */ + private static final Serializable OBJECT_OUT_OF_CLASSPATH = + CommonTestUtils.createObjectForClassNotInClassPath(CLASS_LOADER); + + // ------------------------------------------------------------------------ + + private ClassLoader originalClassLoader; + + @Before + public void setupClassLoader() { + originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(CLASS_LOADER); + } + + @After + public void restoreOriginalClassLoader() { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + + // ------------------------------------------------------------------------ + + @Test + public void guardTest() { + // make sure that this test's assumptions hold + try { + Class.forName(OBJECT_OUT_OF_CLASSPATH.getClass().getName()); + fail("Test ineffective: The test class that should not be on the classpath is actually on the classpath."); + } catch (ClassNotFoundException e) { + // expected + } + } + + // ------------------------------------------------------------------------ + + @Override + protected TypeSerializer<Serializable> createSerializer() { + Thread.currentThread().setContextClassLoader(CLASS_LOADER); + return new JavaSerializer<>(); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<Serializable> getTypeClass() { + return Serializable.class; + } + + @Override + protected Serializable[] getTestData() { + return new Serializable[] { + new Integer(42), + new File("/some/path/that/I/made/up"), + + // an object that is not in the classpath + OBJECT_OUT_OF_CLASSPATH, + + // an object that is in the classpath with a nested object not in the classpath + new Tuple1<>(OBJECT_OUT_OF_CLASSPATH) + }; + } + + // ------------------------------------------------------------------------ + + @Override + public void testInstantiate() { + // this serializer does not support instantiation + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cc3512ee/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java index 639b065..cf2bb7f 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java @@ -29,7 +29,12 @@ import java.io.FileWriter; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.Serializable; import java.lang.reflect.Field; +import java.security.CodeSource; +import java.security.Permissions; +import java.security.ProtectionDomain; +import java.security.cert.Certificate; import java.util.Map; import static org.junit.Assert.fail; @@ -178,4 +183,123 @@ public class CommonTestUtils { throw new RuntimeException(e1); } } + + // ------------------------------------------------------------------------ + // Testing of objects not in the application class loader + // ------------------------------------------------------------------------ + + /** + * Creates a new class that is not part of the classpath that the current JVM uses, and + * instantiates it. + * + * <p>This method uses {@link #createClassNotInClassPath(ClassLoader)} to define the new class. + * + * @param targetClassLoader The class loader to attach the class to + * @return The object instantiated from the newly defined class. + */ + public static Serializable createObjectForClassNotInClassPath(ClassLoader targetClassLoader) { + try { + Class<? extends Serializable> clazz = createClassNotInClassPath(targetClassLoader); + return clazz.newInstance(); + } + catch (Exception e) { + throw new AssertionError("test setup broken", e); + } + } + + /** + * Creates a new class that is not part of the classpath that the current JVM uses. + * The class is ad-hoc defined and attached to the given ClassLoader. + * + * @param targetClassLoader The class loader to attach the class to + * @return The newly defined class + */ + public static Class<? extends Serializable> createClassNotInClassPath(ClassLoader targetClassLoader) { + final byte[] classData = {-54, -2, -70, -66, 0, 0, 0, 51, 0, 65, 10, 0, 15, 0, 43, 7, 0, 44, + 10, 0, 2, 0, 43, 10, 0, 2, 0, 45, 9, 0, 7, 0, 46, 10, 0, 15, 0, 47, 7, 0, 48, 7, 0, + 49, 10, 0, 8, 0, 43, 8, 0, 50, 10, 0, 8, 0, 51, 10, 0, 8, 0, 52, 10, 0, 8, 0, 53, 10, + 0, 8, 0, 54, 7, 0, 55, 7, 0, 56, 1, 0, 16, 115, 101, 114, 105, 97, 108, 86, 101, 114, + 115, 105, 111, 110, 85, 73, 68, 1, 0, 1, 74, 1, 0, 13, 67, 111, 110, 115, 116, 97, 110, + 116, 86, 97, 108, 117, 101, 5, -1, -1, -1, -1, -1, -1, -1, -3, 1, 0, 6, 114, 97, 110, + 100, 111, 109, 1, 0, 6, 60, 105, 110, 105, 116, 62, 1, 0, 3, 40, 41, 86, 1, 0, 4, 67, + 111, 100, 101, 1, 0, 15, 76, 105, 110, 101, 78, 117, 109, 98, 101, 114, 84, 97, 98, 108, + 101, 1, 0, 18, 76, 111, 99, 97, 108, 86, 97, 114, 105, 97, 98, 108, 101, 84, 97, 98, + 108, 101, 1, 0, 4, 116, 104, 105, 115, 1, 0, 35, 76, 111, 114, 103, 47, 97, 112, 97, 99, + 104, 101, 47, 102, 108, 105, 110, 107, 47, 84, 101, 115, 116, 83, 101, 114, 105, 97, 108, + 105, 122, 97, 98, 108, 101, 59, 1, 0, 6, 101, 113, 117, 97, 108, 115, 1, 0, 21, 40, 76, + 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 79, 98, 106, 101, 99, 116, 59, 41, 90, 1, 0, + 1, 111, 1, 0, 18, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 79, 98, 106, 101, 99, + 116, 59, 1, 0, 4, 116, 104, 97, 116, 1, 0, 13, 83, 116, 97, 99, 107, 77, 97, 112, 84, 97, + 98, 108, 101, 7, 0, 48, 1, 0, 8, 104, 97, 115, 104, 67, 111, 100, 101, 1, 0, 3, 40, 41, + 73, 1, 0, 8, 116, 111, 83, 116, 114, 105, 110, 103, 1, 0, 20, 40, 41, 76, 106, 97, 118, 97, + 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 59, 1, 0, 10, 83, 111, 117, 114, + 99, 101, 70, 105, 108, 101, 1, 0, 21, 84, 101, 115, 116, 83, 101, 114, 105, 97, 108, 105, + 122, 97, 98, 108, 101, 46, 106, 97, 118, 97, 12, 0, 23, 0, 24, 1, 0, 16, 106, 97, 118, 97, + 47, 117, 116, 105, 108, 47, 82, 97, 110, 100, 111, 109, 12, 0, 57, 0, 58, 12, 0, 22, 0, 18, + 12, 0, 59, 0, 60, 1, 0, 33, 111, 114, 103, 47, 97, 112, 97, 99, 104, 101, 47, 102, 108, 105, + 110, 107, 47, 84, 101, 115, 116, 83, 101, 114, 105, 97, 108, 105, 122, 97, 98, 108, 101, 1, + 0, 23, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 66, 117, + 105, 108, 100, 101, 114, 1, 0, 24, 84, 101, 115, 116, 83, 101, 114, 105, 97, 108, 105, 122, + 97, 98, 108, 101, 123, 114, 97, 110, 100, 111, 109, 61, 12, 0, 61, 0, 62, 12, 0, 61, 0, 63, + 12, 0, 61, 0, 64, 12, 0, 39, 0, 40, 1, 0, 16, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, + 79, 98, 106, 101, 99, 116, 1, 0, 20, 106, 97, 118, 97, 47, 105, 111, 47, 83, 101, 114, 105, + 97, 108, 105, 122, 97, 98, 108, 101, 1, 0, 8, 110, 101, 120, 116, 76, 111, 110, 103, 1, 0, + 3, 40, 41, 74, 1, 0, 8, 103, 101, 116, 67, 108, 97, 115, 115, 1, 0, 19, 40, 41, 76, 106, 97, + 118, 97, 47, 108, 97, 110, 103, 47, 67, 108, 97, 115, 115, 59, 1, 0, 6, 97, 112, 112, 101, + 110, 100, 1, 0, 45, 40, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, + 110, 103, 59, 41, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, + 103, 66, 117, 105, 108, 100, 101, 114, 59, 1, 0, 28, 40, 74, 41, 76, 106, 97, 118, 97, 47, + 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 66, 117, 105, 108, 100, 101, 114, 59, 1, + 0, 28, 40, 67, 41, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, + 103, 66, 117, 105, 108, 100, 101, 114, 59, 0, 33, 0, 7, 0, 15, 0, 1, 0, 16, 0, 2, 0, 26, 0, + 17, 0, 18, 0, 1, 0, 19, 0, 0, 0, 2, 0, 20, 0, 18, 0, 22, 0, 18, 0, 0, 0, 4, 0, 1, 0, 23, 0, + 24, 0, 1, 0, 25, 0, 0, 0, 69, 0, 3, 0, 1, 0, 0, 0, 19, 42, -73, 0, 1, 42, -69, 0, 2, 89, -73, + 0, 3, -74, 0, 4, -75, 0, 5, -79, 0, 0, 0, 2, 0, 26, 0, 0, 0, 14, 0, 3, 0, 0, 0, 30, 0, 4, 0, + 31, 0, 18, 0, 32, 0, 27, 0, 0, 0, 12, 0, 1, 0, 0, 0, 19, 0, 28, 0, 29, 0, 0, 0, 1, 0, 30, 0, + 31, 0, 1, 0, 25, 0, 0, 0, -116, 0, 4, 0, 3, 0, 0, 0, 47, 42, 43, -90, 0, 5, 4, -84, 43, -58, + 0, 14, 42, -74, 0, 6, 43, -74, 0, 6, -91, 0, 5, 3, -84, 43, -64, 0, 7, 77, 42, -76, 0, 5, 44, + -76, 0, 5, -108, -102, 0, 7, 4, -89, 0, 4, 3, -84, 0, 0, 0, 3, 0, 26, 0, 0, 0, 18, 0, 4, 0, 0, + 0, 36, 0, 7, 0, 37, 0, 24, 0, 39, 0, 29, 0, 40, 0, 27, 0, 0, 0, 32, 0, 3, 0, 0, 0, 47, 0, 28, + 0, 29, 0, 0, 0, 0, 0, 47, 0, 32, 0, 33, 0, 1, 0, 29, 0, 18, 0, 34, 0, 29, 0, 2, 0, 35, 0, 0, + 0, 13, 0, 5, 7, 14, 1, -4, 0, 20, 7, 0, 36, 64, 1, 0, 1, 0, 37, 0, 38, 0, 1, 0, 25, 0, 0, 0, + 56, 0, 5, 0, 1, 0, 0, 0, 14, 42, -76, 0, 5, 42, -76, 0, 5, 16, 32, 125, -125, -120, -84, 0, 0, + 0, 2, 0, 26, 0, 0, 0, 6, 0, 1, 0, 0, 0, 46, 0, 27, 0, 0, 0, 12, 0, 1, 0, 0, 0, 14, 0, 28, 0, + 29, 0, 0, 0, 1, 0, 39, 0, 40, 0, 1, 0, 25, 0, 0, 0, 70, 0, 3, 0, 1, 0, 0, 0, 28, -69, 0, 8, + 89, -73, 0, 9, 18, 10, -74, 0, 11, 42, -76, 0, 5, -74, 0, 12, 16, 125, -74, 0, 13, -74, 0, 14, + -80, 0, 0, 0, 2, 0, 26, 0, 0, 0, 6, 0, 1, 0, 0, 0, 51, 0, 27, 0, 0, 0, 12, 0, 1, 0, 0, 0, 28, + 0, 28, 0, 29, 0, 0, 0, 1, 0, 41, 0, 0, 0, 2, 0, 42,}; + + try { + // define a class into the classloader + Class<?> clazz = getUnsafe().defineClass( + "org.apache.flink.TestSerializable", + classData, 0, classData.length, + targetClassLoader, + new ProtectionDomain(new CodeSource(null, (Certificate[]) null), new Permissions())); + + return clazz.asSubclass(Serializable.class); + } + catch (Exception e) { + throw new AssertionError("test setup broken", e); + } + } + + @SuppressWarnings("restriction") + private static sun.misc.Unsafe getUnsafe() { + try { + Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + return (sun.misc.Unsafe) unsafeField.get(null); + } catch (SecurityException e) { + throw new RuntimeException("Could not access the sun.misc.Unsafe handle, permission denied by security manager.", e); + } catch (NoSuchFieldException e) { + throw new RuntimeException("The static handle field in sun.misc.Unsafe was not found."); + } catch (IllegalArgumentException e) { + throw new RuntimeException("Bug: Illegal argument reflection access for static field.", e); + } catch (IllegalAccessException e) { + throw new RuntimeException("Access to sun.misc.Unsafe is forbidden by the runtime.", e); + } catch (Throwable t) { + throw new RuntimeException("Unclassified error while trying to access the sun.misc.Unsafe handle.", t); + } + } }
