This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e1e84e9  [FLINK-26347][rpc] Using system classloader in 
deserialization of RemoteRpcInvocation
e1e84e9 is described below

commit e1e84e96526429d6e6735e0f969a20f2fb7ae0e2
Author: Yangze Guo <[email protected]>
AuthorDate: Tue Mar 1 14:31:30 2022 +0800

    [FLINK-26347][rpc] Using system classloader in deserialization of 
RemoteRpcInvocation
    
    This closes #18935.
---
 .../runtime/rpc/messages/RemoteRpcInvocation.java  | 242 +++++++++++++--------
 1 file changed, 147 insertions(+), 95 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
index 7b8a460..685ab56 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc.messages;
 
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -35,10 +36,8 @@ import java.io.Serializable;
 public class RemoteRpcInvocation implements RpcInvocation, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private String declaringClassName;
-    private String methodName;
-    private Class<?>[] parameterTypes;
-    private Object[] args;
+    // Wrap the invocation information to ease the serialization.
+    private RemoteRpcInvocation.MethodInvocation methodInvocation;
 
     private transient String toString;
 
@@ -61,29 +60,27 @@ public class RemoteRpcInvocation implements RpcInvocation, 
Serializable {
                                 + "be serializable for remote rpc calls.");
             }
         }
-        this.declaringClassName = declaringClassName;
-        this.methodName = methodName;
-        this.parameterTypes = Preconditions.checkNotNull(parameterTypes);
-        this.args = args;
+        methodInvocation =
+                new MethodInvocation(declaringClassName, methodName, 
parameterTypes, args);
     }
 
     @Override
     public String getMethodName() {
-        return methodName;
+        return methodInvocation.getMethodName();
     }
 
     private String getDeclaringClassName() {
-        return declaringClassName;
+        return methodInvocation.getDeclaringClassName();
     }
 
     @Override
     public Class<?>[] getParameterTypes() {
-        return parameterTypes;
+        return methodInvocation.getParameterTypes();
     }
 
     @Override
     public Object[] getArgs() {
-        return args;
+        return methodInvocation.getArgs();
     }
 
     @Override
@@ -108,129 +105,184 @@ public class RemoteRpcInvocation implements 
RpcInvocation, Serializable {
     // -------------------------------------------------------------------
 
     private void writeObject(ObjectOutputStream oos) throws IOException {
-        oos.writeUTF(declaringClassName);
-        oos.writeUTF(methodName);
+        // Translate it to byte array so that we can deserialize classes which 
cannot be found in
+        // akka class loader.
+        byte[] bytes = InstantiationUtil.serializeObject(methodInvocation);
+        oos.writeObject(bytes);
+    }
+
+    private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+        byte[] bytes = (byte[]) ois.readObject();
+        methodInvocation =
+                InstantiationUtil.deserializeObject(bytes, 
ClassLoader.getSystemClassLoader());
+    }
+
+    // -------------------------------------------------------------------
+    // Utility classes
+    // -------------------------------------------------------------------
+
+    /** Wrapper class for the method invocation information. */
+    private static final class MethodInvocation implements Serializable {
+        private static final long serialVersionUID = 1L;
 
-        oos.writeInt(parameterTypes.length);
+        private String declaringClassName;
+        private String methodName;
+        private Class<?>[] parameterTypes;
+        private Object[] args;
 
-        for (Class<?> parameterType : parameterTypes) {
-            oos.writeObject(parameterType);
+        private MethodInvocation(
+                final String declaringClassName,
+                final String methodName,
+                final Class<?>[] parameterTypes,
+                final Object[] args) {
+            this.declaringClassName = declaringClassName;
+            this.methodName = methodName;
+            this.parameterTypes = Preconditions.checkNotNull(parameterTypes);
+            this.args = args;
         }
 
-        if (args != null) {
-            oos.writeBoolean(true);
+        String getDeclaringClassName() {
+            return declaringClassName;
+        }
 
-            for (int i = 0; i < args.length; i++) {
-                try {
-                    oos.writeObject(args[i]);
-                } catch (IOException e) {
-                    throw new IOException(
-                            "Could not serialize "
-                                    + i
-                                    + "th argument of method "
-                                    + methodName
-                                    + ". This indicates that the argument type 
"
-                                    + args.getClass().getName()
-                                    + " is not serializable. Arguments have to 
"
-                                    + "be serializable for remote rpc calls.",
-                            e);
-                }
-            }
-        } else {
-            oos.writeBoolean(false);
+        String getMethodName() {
+            return methodName;
         }
-    }
 
-    private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
-        declaringClassName = ois.readUTF();
-        methodName = ois.readUTF();
+        Class<?>[] getParameterTypes() {
+            return parameterTypes;
+        }
+
+        Object[] getArgs() {
+            return args;
+        }
 
-        int length = ois.readInt();
+        private void writeObject(ObjectOutputStream oos) throws IOException {
+            oos.writeUTF(declaringClassName);
+            oos.writeUTF(methodName);
 
-        parameterTypes = new Class<?>[length];
+            oos.writeInt(parameterTypes.length);
 
-        for (int i = 0; i < length; i++) {
-            try {
-                parameterTypes[i] = (Class<?>) ois.readObject();
-            } catch (IOException e) {
-                StringBuilder incompleteMethod = getIncompleteMethodString(i, 
0);
-                throw new IOException(
-                        "Could not deserialize "
-                                + i
-                                + "th parameter type of method "
-                                + incompleteMethod
-                                + '.',
-                        e);
-            } catch (ClassNotFoundException e) {
-                // note: wrapping this CNFE into another CNFE does not 
overwrite the Exception
-                //       stored in the ObjectInputStream (see 
ObjectInputStream#readSerialData)
-                // -> add a suppressed exception that adds a more specific 
message
-                StringBuilder incompleteMethod = getIncompleteMethodString(i, 
0);
-                e.addSuppressed(
-                        new ClassNotFoundException(
-                                "Could not deserialize "
+            for (Class<?> parameterType : parameterTypes) {
+                oos.writeObject(parameterType);
+            }
+
+            if (args != null) {
+                oos.writeBoolean(true);
+
+                for (int i = 0; i < args.length; i++) {
+                    try {
+                        oos.writeObject(args[i]);
+                    } catch (IOException e) {
+                        throw new IOException(
+                                "Could not serialize "
                                         + i
-                                        + "th "
-                                        + "parameter type of method "
-                                        + incompleteMethod
-                                        + ". This indicates that the parameter 
"
-                                        + "type is not part of the system 
class loader."));
-                throw e;
+                                        + "th argument of method "
+                                        + methodName
+                                        + ". This indicates that the argument 
type "
+                                        + args.getClass().getName()
+                                        + " is not serializable. Arguments 
have to "
+                                        + "be serializable for remote rpc 
calls.",
+                                e);
+                    }
+                }
+            } else {
+                oos.writeBoolean(false);
             }
         }
 
-        boolean hasArgs = ois.readBoolean();
+        private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+            declaringClassName = ois.readUTF();
+            methodName = ois.readUTF();
+
+            int length = ois.readInt();
 
-        if (hasArgs) {
-            args = new Object[length];
+            parameterTypes = new Class<?>[length];
 
             for (int i = 0; i < length; i++) {
                 try {
-                    args[i] = ois.readObject();
+                    parameterTypes[i] = (Class<?>) ois.readObject();
                 } catch (IOException e) {
-                    StringBuilder incompleteMethod = 
getIncompleteMethodString(length, i);
+                    StringBuilder incompleteMethod = 
getIncompleteMethodString(i, 0);
                     throw new IOException(
                             "Could not deserialize "
                                     + i
-                                    + "th argument of method "
+                                    + "th parameter type of method "
                                     + incompleteMethod
                                     + '.',
                             e);
                 } catch (ClassNotFoundException e) {
-                    // note: wrapping this CNFE into another CNFE does not 
overwrite the
-                    // Exception
-                    //       stored in the ObjectInputStream (see
-                    // ObjectInputStream#readSerialData)
+                    // note: wrapping this CNFE into another CNFE does not 
overwrite the Exception
+                    //       stored in the ObjectInputStream (see 
ObjectInputStream#readSerialData)
                     // -> add a suppressed exception that adds a more specific 
message
-                    StringBuilder incompleteMethod = 
getIncompleteMethodString(length, i);
+                    StringBuilder incompleteMethod = 
getIncompleteMethodString(i, 0);
                     e.addSuppressed(
                             new ClassNotFoundException(
                                     "Could not deserialize "
                                             + i
                                             + "th "
-                                            + "argument of method "
+                                            + "parameter type of method "
                                             + incompleteMethod
-                                            + ". This indicates that the 
argument "
+                                            + ". This indicates that the 
parameter "
                                             + "type is not part of the system 
class loader."));
                     throw e;
                 }
             }
-        } else {
-            args = null;
+
+            boolean hasArgs = ois.readBoolean();
+
+            if (hasArgs) {
+                args = new Object[length];
+
+                for (int i = 0; i < length; i++) {
+                    try {
+                        args[i] = ois.readObject();
+                    } catch (IOException e) {
+                        StringBuilder incompleteMethod = 
getIncompleteMethodString(length, i);
+                        throw new IOException(
+                                "Could not deserialize "
+                                        + i
+                                        + "th argument of method "
+                                        + incompleteMethod
+                                        + '.',
+                                e);
+                    } catch (ClassNotFoundException e) {
+                        // note: wrapping this CNFE into another CNFE does not 
overwrite the
+                        // Exception
+                        //       stored in the ObjectInputStream (see
+                        // ObjectInputStream#readSerialData)
+                        // -> add a suppressed exception that adds a more 
specific message
+                        StringBuilder incompleteMethod = 
getIncompleteMethodString(length, i);
+                        e.addSuppressed(
+                                new ClassNotFoundException(
+                                        "Could not deserialize "
+                                                + i
+                                                + "th "
+                                                + "argument of method "
+                                                + incompleteMethod
+                                                + ". This indicates that the 
argument "
+                                                + "type is not part of the 
system class loader."));
+                        throw e;
+                    }
+                }
+            } else {
+                args = null;
+            }
         }
-    }
 
-    private StringBuilder getIncompleteMethodString(int lastMethodTypeIdx, int 
lastArgumentIdx) {
-        StringBuilder incompleteMethod = new StringBuilder();
-        incompleteMethod.append(methodName).append('(');
-        for (int i = 0; i < lastMethodTypeIdx; ++i) {
-            incompleteMethod.append(parameterTypes[i].getCanonicalName());
-            if (i < lastArgumentIdx) {
-                incompleteMethod.append(": ").append(args[i]);
+        private StringBuilder getIncompleteMethodString(
+                int lastMethodTypeIdx, int lastArgumentIdx) {
+            StringBuilder incompleteMethod = new StringBuilder();
+            incompleteMethod.append(methodName).append('(');
+            for (int i = 0; i < lastMethodTypeIdx; ++i) {
+                incompleteMethod.append(parameterTypes[i].getCanonicalName());
+                if (i < lastArgumentIdx) {
+                    incompleteMethod.append(": ").append(args[i]);
+                }
+                incompleteMethod.append(", ");
             }
-            incompleteMethod.append(", ");
+            incompleteMethod.append("...)"); // some parameters could not be 
deserialized
+            return incompleteMethod;
         }
-        incompleteMethod.append("...)"); // some parameters could not be 
deserialized
-        return incompleteMethod;
     }
 }

Reply via email to