This is an automated email from the ASF dual-hosted git repository.
guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e1e84e9 [FLINK-26347][rpc] Using system classloader in
deserialization of RemoteRpcInvocation
e1e84e9 is described below
commit e1e84e96526429d6e6735e0f969a20f2fb7ae0e2
Author: Yangze Guo <[email protected]>
AuthorDate: Tue Mar 1 14:31:30 2022 +0800
[FLINK-26347][rpc] Using system classloader in deserialization of
RemoteRpcInvocation
This closes #18935.
---
.../runtime/rpc/messages/RemoteRpcInvocation.java | 242 +++++++++++++--------
1 file changed, 147 insertions(+), 95 deletions(-)
diff --git
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
index 7b8a460..685ab56 100644
---
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
+++
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rpc.messages;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -35,10 +36,8 @@ import java.io.Serializable;
public class RemoteRpcInvocation implements RpcInvocation, Serializable {
private static final long serialVersionUID = 1L;
- private String declaringClassName;
- private String methodName;
- private Class<?>[] parameterTypes;
- private Object[] args;
+ // Wrap the invocation information to ease the serialization.
+ private RemoteRpcInvocation.MethodInvocation methodInvocation;
private transient String toString;
@@ -61,29 +60,27 @@ public class RemoteRpcInvocation implements RpcInvocation,
Serializable {
+ "be serializable for remote rpc calls.");
}
}
- this.declaringClassName = declaringClassName;
- this.methodName = methodName;
- this.parameterTypes = Preconditions.checkNotNull(parameterTypes);
- this.args = args;
+ methodInvocation =
+ new MethodInvocation(declaringClassName, methodName,
parameterTypes, args);
}
@Override
public String getMethodName() {
- return methodName;
+ return methodInvocation.getMethodName();
}
private String getDeclaringClassName() {
- return declaringClassName;
+ return methodInvocation.getDeclaringClassName();
}
@Override
public Class<?>[] getParameterTypes() {
- return parameterTypes;
+ return methodInvocation.getParameterTypes();
}
@Override
public Object[] getArgs() {
- return args;
+ return methodInvocation.getArgs();
}
@Override
@@ -108,129 +105,184 @@ public class RemoteRpcInvocation implements
RpcInvocation, Serializable {
// -------------------------------------------------------------------
private void writeObject(ObjectOutputStream oos) throws IOException {
- oos.writeUTF(declaringClassName);
- oos.writeUTF(methodName);
+ // Translate it to byte array so that we can deserialize classes which
cannot be found in
+ // akka class loader.
+ byte[] bytes = InstantiationUtil.serializeObject(methodInvocation);
+ oos.writeObject(bytes);
+ }
+
+ private void readObject(ObjectInputStream ois) throws IOException,
ClassNotFoundException {
+ byte[] bytes = (byte[]) ois.readObject();
+ methodInvocation =
+ InstantiationUtil.deserializeObject(bytes,
ClassLoader.getSystemClassLoader());
+ }
+
+ // -------------------------------------------------------------------
+ // Utility classes
+ // -------------------------------------------------------------------
+
+ /** Wrapper class for the method invocation information. */
+ private static final class MethodInvocation implements Serializable {
+ private static final long serialVersionUID = 1L;
- oos.writeInt(parameterTypes.length);
+ private String declaringClassName;
+ private String methodName;
+ private Class<?>[] parameterTypes;
+ private Object[] args;
- for (Class<?> parameterType : parameterTypes) {
- oos.writeObject(parameterType);
+ private MethodInvocation(
+ final String declaringClassName,
+ final String methodName,
+ final Class<?>[] parameterTypes,
+ final Object[] args) {
+ this.declaringClassName = declaringClassName;
+ this.methodName = methodName;
+ this.parameterTypes = Preconditions.checkNotNull(parameterTypes);
+ this.args = args;
}
- if (args != null) {
- oos.writeBoolean(true);
+ String getDeclaringClassName() {
+ return declaringClassName;
+ }
- for (int i = 0; i < args.length; i++) {
- try {
- oos.writeObject(args[i]);
- } catch (IOException e) {
- throw new IOException(
- "Could not serialize "
- + i
- + "th argument of method "
- + methodName
- + ". This indicates that the argument type
"
- + args.getClass().getName()
- + " is not serializable. Arguments have to
"
- + "be serializable for remote rpc calls.",
- e);
- }
- }
- } else {
- oos.writeBoolean(false);
+ String getMethodName() {
+ return methodName;
}
- }
- private void readObject(ObjectInputStream ois) throws IOException,
ClassNotFoundException {
- declaringClassName = ois.readUTF();
- methodName = ois.readUTF();
+ Class<?>[] getParameterTypes() {
+ return parameterTypes;
+ }
+
+ Object[] getArgs() {
+ return args;
+ }
- int length = ois.readInt();
+ private void writeObject(ObjectOutputStream oos) throws IOException {
+ oos.writeUTF(declaringClassName);
+ oos.writeUTF(methodName);
- parameterTypes = new Class<?>[length];
+ oos.writeInt(parameterTypes.length);
- for (int i = 0; i < length; i++) {
- try {
- parameterTypes[i] = (Class<?>) ois.readObject();
- } catch (IOException e) {
- StringBuilder incompleteMethod = getIncompleteMethodString(i,
0);
- throw new IOException(
- "Could not deserialize "
- + i
- + "th parameter type of method "
- + incompleteMethod
- + '.',
- e);
- } catch (ClassNotFoundException e) {
- // note: wrapping this CNFE into another CNFE does not
overwrite the Exception
- // stored in the ObjectInputStream (see
ObjectInputStream#readSerialData)
- // -> add a suppressed exception that adds a more specific
message
- StringBuilder incompleteMethod = getIncompleteMethodString(i,
0);
- e.addSuppressed(
- new ClassNotFoundException(
- "Could not deserialize "
+ for (Class<?> parameterType : parameterTypes) {
+ oos.writeObject(parameterType);
+ }
+
+ if (args != null) {
+ oos.writeBoolean(true);
+
+ for (int i = 0; i < args.length; i++) {
+ try {
+ oos.writeObject(args[i]);
+ } catch (IOException e) {
+ throw new IOException(
+ "Could not serialize "
+ i
- + "th "
- + "parameter type of method "
- + incompleteMethod
- + ". This indicates that the parameter
"
- + "type is not part of the system
class loader."));
- throw e;
+ + "th argument of method "
+ + methodName
+ + ". This indicates that the argument
type "
+ + args.getClass().getName()
+ + " is not serializable. Arguments
have to "
+ + "be serializable for remote rpc
calls.",
+ e);
+ }
+ }
+ } else {
+ oos.writeBoolean(false);
}
}
- boolean hasArgs = ois.readBoolean();
+ private void readObject(ObjectInputStream ois) throws IOException,
ClassNotFoundException {
+ declaringClassName = ois.readUTF();
+ methodName = ois.readUTF();
+
+ int length = ois.readInt();
- if (hasArgs) {
- args = new Object[length];
+ parameterTypes = new Class<?>[length];
for (int i = 0; i < length; i++) {
try {
- args[i] = ois.readObject();
+ parameterTypes[i] = (Class<?>) ois.readObject();
} catch (IOException e) {
- StringBuilder incompleteMethod =
getIncompleteMethodString(length, i);
+ StringBuilder incompleteMethod =
getIncompleteMethodString(i, 0);
throw new IOException(
"Could not deserialize "
+ i
- + "th argument of method "
+ + "th parameter type of method "
+ incompleteMethod
+ '.',
e);
} catch (ClassNotFoundException e) {
- // note: wrapping this CNFE into another CNFE does not
overwrite the
- // Exception
- // stored in the ObjectInputStream (see
- // ObjectInputStream#readSerialData)
+ // note: wrapping this CNFE into another CNFE does not
overwrite the Exception
+ // stored in the ObjectInputStream (see
ObjectInputStream#readSerialData)
// -> add a suppressed exception that adds a more specific
message
- StringBuilder incompleteMethod =
getIncompleteMethodString(length, i);
+ StringBuilder incompleteMethod =
getIncompleteMethodString(i, 0);
e.addSuppressed(
new ClassNotFoundException(
"Could not deserialize "
+ i
+ "th "
- + "argument of method "
+ + "parameter type of method "
+ incompleteMethod
- + ". This indicates that the
argument "
+ + ". This indicates that the
parameter "
+ "type is not part of the system
class loader."));
throw e;
}
}
- } else {
- args = null;
+
+ boolean hasArgs = ois.readBoolean();
+
+ if (hasArgs) {
+ args = new Object[length];
+
+ for (int i = 0; i < length; i++) {
+ try {
+ args[i] = ois.readObject();
+ } catch (IOException e) {
+ StringBuilder incompleteMethod =
getIncompleteMethodString(length, i);
+ throw new IOException(
+ "Could not deserialize "
+ + i
+ + "th argument of method "
+ + incompleteMethod
+ + '.',
+ e);
+ } catch (ClassNotFoundException e) {
+ // note: wrapping this CNFE into another CNFE does not
overwrite the
+ // Exception
+ // stored in the ObjectInputStream (see
+ // ObjectInputStream#readSerialData)
+ // -> add a suppressed exception that adds a more
specific message
+ StringBuilder incompleteMethod =
getIncompleteMethodString(length, i);
+ e.addSuppressed(
+ new ClassNotFoundException(
+ "Could not deserialize "
+ + i
+ + "th "
+ + "argument of method "
+ + incompleteMethod
+ + ". This indicates that the
argument "
+ + "type is not part of the
system class loader."));
+ throw e;
+ }
+ }
+ } else {
+ args = null;
+ }
}
- }
- private StringBuilder getIncompleteMethodString(int lastMethodTypeIdx, int
lastArgumentIdx) {
- StringBuilder incompleteMethod = new StringBuilder();
- incompleteMethod.append(methodName).append('(');
- for (int i = 0; i < lastMethodTypeIdx; ++i) {
- incompleteMethod.append(parameterTypes[i].getCanonicalName());
- if (i < lastArgumentIdx) {
- incompleteMethod.append(": ").append(args[i]);
+ private StringBuilder getIncompleteMethodString(
+ int lastMethodTypeIdx, int lastArgumentIdx) {
+ StringBuilder incompleteMethod = new StringBuilder();
+ incompleteMethod.append(methodName).append('(');
+ for (int i = 0; i < lastMethodTypeIdx; ++i) {
+ incompleteMethod.append(parameterTypes[i].getCanonicalName());
+ if (i < lastArgumentIdx) {
+ incompleteMethod.append(": ").append(args[i]);
+ }
+ incompleteMethod.append(", ");
}
- incompleteMethod.append(", ");
+ incompleteMethod.append("...)"); // some parameters could not be
deserialized
+ return incompleteMethod;
}
- incompleteMethod.append("...)"); // some parameters could not be
deserialized
- return incompleteMethod;
}
}