[
https://issues.apache.org/jira/browse/FLINK-10655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675760#comment-16675760
]
ASF GitHub Bot commented on FLINK-10655:
----------------------------------------
tillrohrmann closed pull request #7005: [FLINK-10655][rpc] fix
RemoteRpcInvocation not overwriting ObjectInputStream's ClassNotFoundException
URL: https://github.com/apache/flink/pull/7005
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
index e906f62f65b..fb119453057 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
@@ -23,21 +23,17 @@
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.TestLogger;
+
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import javax.tools.JavaCompiler;
-import javax.tools.ToolProvider;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
public class EnumSerializerUpgradeTest extends TestLogger {
@@ -87,7 +83,7 @@ public void checkDifferentFieldOrder() throws Exception {
private static TypeSerializerSchemaCompatibility
checkCompatibility(String enumSourceA, String enumSourceB)
throws IOException, ClassNotFoundException {
- ClassLoader classLoader = compileAndLoadEnum(
+ ClassLoader classLoader = ClassLoaderUtils.compileAndLoadJava(
temporaryFolder.newFolder(), ENUM_NAME + ".java",
enumSourceA);
EnumSerializer enumSerializer = new
EnumSerializer(classLoader.loadClass(ENUM_NAME));
@@ -103,7 +99,7 @@ private static TypeSerializerSchemaCompatibility
checkCompatibility(String enumS
snapshotBytes = outBuffer.toByteArray();
}
- ClassLoader classLoader2 = compileAndLoadEnum(
+ ClassLoader classLoader2 = ClassLoaderUtils.compileAndLoadJava(
temporaryFolder.newFolder(), ENUM_NAME + ".java",
enumSourceB);
TypeSerializerSnapshot restoredSnapshot;
@@ -118,29 +114,4 @@ private static TypeSerializerSchemaCompatibility
checkCompatibility(String enumS
EnumSerializer enumSerializer2 = new
EnumSerializer(classLoader2.loadClass(ENUM_NAME));
return
restoredSnapshot.resolveSchemaCompatibility(enumSerializer2);
}
-
- private static ClassLoader compileAndLoadEnum(File root, String
filename, String source) throws IOException {
- File file = writeSourceFile(root, filename, source);
-
- compileClass(file);
-
- return new URLClassLoader(
- new URL[]{root.toURI().toURL()},
- Thread.currentThread().getContextClassLoader());
- }
-
- private static File writeSourceFile(File root, String filename, String
source) throws IOException {
- File file = new File(root, filename);
- FileWriter fileWriter = new FileWriter(file);
-
- fileWriter.write(source);
- fileWriter.close();
-
- return file;
- }
-
- private static int compileClass(File sourceFile) {
- JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
- return compiler.run(null, null, null, "-proc:none",
sourceFile.getPath());
- }
}
diff --git
a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
new file mode 100644
index 00000000000..0688c1df156
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils;
+
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * Utilities to create class loaders.
+ */
+public class ClassLoaderUtils {
+ public static URLClassLoader compileAndLoadJava(File root, String
filename, String source) throws
+ IOException {
+ File file = writeSourceFile(root, filename, source);
+
+ compileClass(file);
+
+ return new URLClassLoader(
+ new URL[]{root.toURI().toURL()},
+ Thread.currentThread().getContextClassLoader());
+ }
+
+ private static File writeSourceFile(File root, String filename, String
source) throws IOException {
+ File file = new File(root, filename);
+ FileWriter fileWriter = new FileWriter(file);
+
+ fileWriter.write(source);
+ fileWriter.close();
+
+ return file;
+ }
+
+ private static int compileClass(File sourceFile) {
+ JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+ return compiler.run(null, null, null, "-proc:none",
sourceFile.getPath());
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
index 7b9fb887670..486816de8e8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
@@ -203,12 +203,18 @@ private void readObject(ObjectInputStream ois) throws
IOException, ClassNotFound
try {
parameterTypes[i] = (Class<?>)
ois.readObject();
} catch (IOException e) {
+ StringBuilder incompleteMethod =
getIncompleteMethodString(i, 0);
throw new IOException("Could not
deserialize " + i + "th parameter type of method " +
- methodName + '.', e);
+ incompleteMethod + '.', e);
} catch (ClassNotFoundException e) {
- throw new ClassNotFoundException("Could
not deserialize " + i + "th " +
- "parameter type of method " +
methodName + ". This indicates that the parameter " +
- "type is not part of the system
class loader.", e);
+ // note: wrapping this CNFE into
another CNFE does not overwrite the Exception
+ // stored in the
ObjectInputStream (see ObjectInputStream#readSerialData)
+ // -> add a suppressed exception that
adds a more specific message
+ StringBuilder incompleteMethod =
getIncompleteMethodString(i, 0);
+ e.addSuppressed(new
ClassNotFoundException("Could not deserialize " + i + "th " +
+ "parameter type of method " +
incompleteMethod + ". This indicates that the parameter " +
+ "type is not part of the system
class loader."));
+ throw e;
}
}
@@ -221,17 +227,37 @@ private void readObject(ObjectInputStream ois) throws
IOException, ClassNotFound
try {
args[i] = ois.readObject();
} catch (IOException e) {
+ StringBuilder incompleteMethod
= getIncompleteMethodString(length, i);
throw new IOException("Could
not deserialize " + i + "th argument of method " +
- methodName + '.', e);
+ incompleteMethod + '.',
e);
} catch (ClassNotFoundException e) {
- throw new
ClassNotFoundException("Could not deserialize " + i + "th " +
- "argument of method " +
methodName + ". This indicates that the argument " +
- "type is not part of
the system class loader.", e);
+ // note: wrapping this CNFE
into another CNFE does not overwrite the Exception
+ // stored in the
ObjectInputStream (see ObjectInputStream#readSerialData)
+ // -> add a suppressed
exception that adds a more specific message
+ StringBuilder incompleteMethod
= getIncompleteMethodString(length, i);
+ e.addSuppressed(new
ClassNotFoundException("Could not deserialize " + i + "th " +
+ "argument of method " +
incompleteMethod + ". This indicates that the argument " +
+ "type is not part of
the system class loader."));
+ throw e;
}
}
} else {
args = null;
}
}
+
+ private StringBuilder getIncompleteMethodString(int
lastMethodTypeIdx, int lastArgumentIdx) {
+ StringBuilder incompleteMethod = new StringBuilder();
+ incompleteMethod.append(methodName).append('(');
+ for (int i = 0; i < lastMethodTypeIdx; ++i) {
+
incompleteMethod.append(parameterTypes[i].getCanonicalName());
+ if (i < lastArgumentIdx) {
+ incompleteMethod.append(":
").append(args[i]);
+ }
+ incompleteMethod.append(", ");
+ }
+ incompleteMethod.append("...)"); // some parameters
could not be deserialized
+ return incompleteMethod;
+ }
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java
index c02278c5088..7c664ce0b32 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java
@@ -19,21 +19,78 @@
package org.apache.flink.runtime.classloading;
import
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
import java.net.URL;
import java.net.URLClassLoader;
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.hasItemInArray;
+import static org.hamcrest.Matchers.hasProperty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
/**
- * Tests for classloading and class loder utilities.
+ * Tests for classloading and class loader utilities.
*/
public class ClassLoaderTest extends TestLogger {
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testMessageDecodingWithUnavailableClass() throws Exception {
+ final ClassLoader systemClassLoader =
getClass().getClassLoader();
+
+ final String className = "UserClass";
+ final URLClassLoader userClassLoader =
ClassLoaderUtils.compileAndLoadJava(
+ temporaryFolder.newFolder(),
+ className + ".java",
+ "import java.io.Serializable;\n"
+ + "public class " + className + " implements
Serializable {}");
+
+ RemoteRpcInvocation method = new RemoteRpcInvocation(
+ "test",
+ new Class<?>[] {
+ int.class,
+ Class.forName(className, false,
userClassLoader)},
+ new Object[] {
+ 1,
+ Class.forName(className, false,
userClassLoader).newInstance()});
+
+ SerializedValue<RemoteRpcInvocation> serializedMethod = new
SerializedValue<>(method);
+
+ expectedException.expect(ClassNotFoundException.class);
+ expectedException.expect(
+ allOf(
+ isA(ClassNotFoundException.class),
+ hasProperty("suppressed",
+ hasItemInArray(
+ allOf(
+
isA(ClassNotFoundException.class),
+ hasProperty("message",
+
containsString("Could not deserialize 1th parameter type of method test(int,
...).")))))));
+
+ RemoteRpcInvocation deserializedMethod =
serializedMethod.deserializeValue(systemClassLoader);
+ deserializedMethod.getMethodName();
+
+ userClassLoader.close();
+ }
+
@Test
public void testParentFirstClassLoading() throws Exception {
final ClassLoader parentClassLoader =
getClass().getClassLoader();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> RemoteRpcInvocation not overwriting ObjectInputStream's ClassNotFoundException
> ------------------------------------------------------------------------------
>
> Key: FLINK-10655
> URL: https://issues.apache.org/jira/browse/FLINK-10655
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination
> Affects Versions: 1.5.4, 1.6.1, 1.7.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> {{RemoteRpcInvocation}} tries to give a more detailed
> {{ClassNotFoundException}} if the method type/argument deserialization fails.
> However, it turns out, once {{ObjectInputStream}} has received a
> {{ClassNotFoundException}}, it will not overwrite this anymore and we can
> therefore not provide a more detailed {{ClassNotFoundException}}.
> Instead, the least invasive solution would be to add a suppressed
> {{ClassNotFoundException}} to the existing one. While at it, we could also
> add more details, i.e. the successfully deserialized types and arguments.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)