This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca78ae75a1b1434109b2d91452dbaad7c3e88186
Author: Nico Kruber <n...@data-artisans.com>
AuthorDate: Tue Oct 23 18:47:02 2018 +0200

    [FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting 
ObjectInputStream's ClassNotFoundException
---
 .../typeutils/base/EnumSerializerUpgradeTest.java  | 37 ++------------
 .../apache/flink/testutils/ClassLoaderUtils.java   | 59 ++++++++++++++++++++++
 .../runtime/rpc/messages/RemoteRpcInvocation.java  | 42 ++++++++++++---
 .../runtime/classloading/ClassLoaderTest.java      | 59 +++++++++++++++++++++-
 4 files changed, 155 insertions(+), 42 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
index 6502eb3..2bcae45 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
@@ -23,21 +23,17 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import javax.tools.JavaCompiler;
-import javax.tools.ToolProvider;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
 
 public class EnumSerializerUpgradeTest extends TestLogger {
 
@@ -87,7 +83,7 @@ public class EnumSerializerUpgradeTest extends TestLogger {
        private static CompatibilityResult checkCompatibility(String 
enumSourceA, String enumSourceB)
                throws IOException, ClassNotFoundException {
 
-               ClassLoader classLoader = compileAndLoadEnum(
+               ClassLoader classLoader = ClassLoaderUtils.compileAndLoadJava(
                        temporaryFolder.newFolder(), ENUM_NAME + ".java", 
enumSourceA);
 
                EnumSerializer enumSerializer = new 
EnumSerializer(classLoader.loadClass(ENUM_NAME));
@@ -102,7 +98,7 @@ public class EnumSerializerUpgradeTest extends TestLogger {
                        snapshotBytes = outBuffer.toByteArray();
                }
 
-               ClassLoader classLoader2 = compileAndLoadEnum(
+               ClassLoader classLoader2 = ClassLoaderUtils.compileAndLoadJava(
                        temporaryFolder.newFolder(), ENUM_NAME + ".java", 
enumSourceB);
 
                TypeSerializerConfigSnapshot restoredSnapshot;
@@ -116,29 +112,4 @@ public class EnumSerializerUpgradeTest extends TestLogger {
                EnumSerializer enumSerializer2 = new 
EnumSerializer(classLoader2.loadClass(ENUM_NAME));
                return enumSerializer2.ensureCompatibility(restoredSnapshot);
        }
-
-       private static ClassLoader compileAndLoadEnum(File root, String 
filename, String source) throws IOException {
-               File file = writeSourceFile(root, filename, source);
-
-               compileClass(file);
-
-               return new URLClassLoader(
-                       new URL[]{root.toURI().toURL()},
-                       Thread.currentThread().getContextClassLoader());
-       }
-
-       private static File writeSourceFile(File root, String filename, String 
source) throws IOException {
-               File file = new File(root, filename);
-               FileWriter fileWriter = new FileWriter(file);
-
-               fileWriter.write(source);
-               fileWriter.close();
-
-               return file;
-       }
-
-       private static int compileClass(File sourceFile) {
-               JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
-               return compiler.run(null, null, null, sourceFile.getPath());
-       }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java 
b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
new file mode 100644
index 0000000..0688c1d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils;
+
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * Utilities to create class loaders.
+ */
+public class ClassLoaderUtils {
+       public static URLClassLoader compileAndLoadJava(File root, String 
filename, String source) throws
+               IOException {
+               File file = writeSourceFile(root, filename, source);
+
+               compileClass(file);
+
+               return new URLClassLoader(
+                       new URL[]{root.toURI().toURL()},
+                       Thread.currentThread().getContextClassLoader());
+       }
+
+       private static File writeSourceFile(File root, String filename, String 
source) throws IOException {
+               File file = new File(root, filename);
+               FileWriter fileWriter = new FileWriter(file);
+
+               fileWriter.write(source);
+               fileWriter.close();
+
+               return file;
+       }
+
+       private static int compileClass(File sourceFile) {
+               JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+               return compiler.run(null, null, null, "-proc:none", 
sourceFile.getPath());
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
index 7b9fb88..486816d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
@@ -203,12 +203,18 @@ public class RemoteRpcInvocation implements 
RpcInvocation, Serializable {
                                try {
                                        parameterTypes[i] = (Class<?>) 
ois.readObject();
                                } catch (IOException e) {
+                                       StringBuilder incompleteMethod = 
getIncompleteMethodString(i, 0);
                                        throw new IOException("Could not 
deserialize " + i + "th parameter type of method " +
-                                               methodName + '.', e);
+                                               incompleteMethod + '.', e);
                                } catch (ClassNotFoundException e) {
-                                       throw new ClassNotFoundException("Could 
not deserialize " + i + "th " +
-                                               "parameter type of method " + 
methodName + ". This indicates that the parameter " +
-                                               "type is not part of the system 
class loader.", e);
+                                       // note: wrapping this CNFE into 
another CNFE does not overwrite the Exception
+                                       //       stored in the 
ObjectInputStream (see ObjectInputStream#readSerialData)
+                                       // -> add a suppressed exception that 
adds a more specific message
+                                       StringBuilder incompleteMethod = 
getIncompleteMethodString(i, 0);
+                                       e.addSuppressed(new 
ClassNotFoundException("Could not deserialize " + i + "th " +
+                                               "parameter type of method " + 
incompleteMethod + ". This indicates that the parameter " +
+                                               "type is not part of the system 
class loader."));
+                                       throw e;
                                }
                        }
 
@@ -221,17 +227,37 @@ public class RemoteRpcInvocation implements 
RpcInvocation, Serializable {
                                        try {
                                                args[i] = ois.readObject();
                                        } catch (IOException e) {
+                                               StringBuilder incompleteMethod 
= getIncompleteMethodString(length, i);
                                                throw new IOException("Could 
not deserialize " + i + "th argument of method " +
-                                                       methodName + '.', e);
+                                                       incompleteMethod + '.', 
e);
                                        } catch (ClassNotFoundException e) {
-                                               throw new 
ClassNotFoundException("Could not deserialize " + i + "th " +
-                                                       "argument of method " + 
methodName + ". This indicates that the argument " +
-                                                       "type is not part of 
the system class loader.", e);
+                                               // note: wrapping this CNFE 
into another CNFE does not overwrite the Exception
+                                               //       stored in the 
ObjectInputStream (see ObjectInputStream#readSerialData)
+                                               // -> add a suppressed 
exception that adds a more specific message
+                                               StringBuilder incompleteMethod 
= getIncompleteMethodString(length, i);
+                                               e.addSuppressed(new 
ClassNotFoundException("Could not deserialize " + i + "th " +
+                                                       "argument of method " + 
incompleteMethod + ". This indicates that the argument " +
+                                                       "type is not part of 
the system class loader."));
+                                               throw e;
                                        }
                                }
                        } else {
                                args = null;
                        }
                }
+
+               private StringBuilder getIncompleteMethodString(int 
lastMethodTypeIdx, int lastArgumentIdx) {
+                       StringBuilder incompleteMethod = new StringBuilder();
+                       incompleteMethod.append(methodName).append('(');
+                       for (int i = 0; i < lastMethodTypeIdx; ++i) {
+                               
incompleteMethod.append(parameterTypes[i].getCanonicalName());
+                               if (i < lastArgumentIdx) {
+                                       incompleteMethod.append(": 
").append(args[i]);
+                               }
+                               incompleteMethod.append(", ");
+                       }
+                       incompleteMethod.append("...)"); // some parameters 
could not be deserialized
+                       return incompleteMethod;
+               }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java
index c02278c..7c664ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java
@@ -19,21 +19,78 @@
 package org.apache.flink.runtime.classloading;
 
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
 
 import java.net.URL;
 import java.net.URLClassLoader;
 
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.hasItemInArray;
+import static org.hamcrest.Matchers.hasProperty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 /**
- * Tests for classloading and class loder utilities.
+ * Tests for classloading and class loader utilities.
  */
 public class ClassLoaderTest extends TestLogger {
 
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Rule
+       public ExpectedException expectedException = ExpectedException.none();
+
+       @Test
+       public void testMessageDecodingWithUnavailableClass() throws Exception {
+               final ClassLoader systemClassLoader = 
getClass().getClassLoader();
+
+               final String className = "UserClass";
+               final URLClassLoader userClassLoader = 
ClassLoaderUtils.compileAndLoadJava(
+                       temporaryFolder.newFolder(),
+                       className + ".java",
+                       "import java.io.Serializable;\n"
+                               + "public class " + className + " implements 
Serializable {}");
+
+               RemoteRpcInvocation method = new RemoteRpcInvocation(
+                       "test",
+                       new Class<?>[] {
+                               int.class,
+                               Class.forName(className, false, 
userClassLoader)},
+                       new Object[] {
+                               1,
+                               Class.forName(className, false, 
userClassLoader).newInstance()});
+
+               SerializedValue<RemoteRpcInvocation> serializedMethod = new 
SerializedValue<>(method);
+
+               expectedException.expect(ClassNotFoundException.class);
+               expectedException.expect(
+                       allOf(
+                               isA(ClassNotFoundException.class),
+                               hasProperty("suppressed",
+                                       hasItemInArray(
+                                               allOf(
+                                                       
isA(ClassNotFoundException.class),
+                                                       hasProperty("message",
+                                                               
containsString("Could not deserialize 1th parameter type of method test(int, 
...).")))))));
+
+               RemoteRpcInvocation deserializedMethod = 
serializedMethod.deserializeValue(systemClassLoader);
+               deserializedMethod.getMethodName();
+
+               userClassLoader.close();
+       }
+
        @Test
        public void testParentFirstClassLoading() throws Exception {
                final ClassLoader parentClassLoader = 
getClass().getClassLoader();

Reply via email to