Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/113#discussion_r17596867
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
 ---
    @@ -57,31 +57,71 @@ public static RuntimeContext getFunctionRuntimeContext 
(Function function, Runti
                        return defaultContext;
                }
        }
    +   
    +   public static Method checkAndExtractLambdaMethod(Function function) {
    +           try {
    +                   // get serialized lambda
    +                   Object serializedLambda = null;
    +                   for (Class<?> clazz = function.getClass(); clazz != 
null; clazz = clazz.getSuperclass()) {
    +                           try {
    +                                   Method replaceMethod = 
clazz.getDeclaredMethod("writeReplace");
    +                                   replaceMethod.setAccessible(true);
    +                                   Object serialVersion = 
replaceMethod.invoke(function);
     
    -   public static boolean isLambdaFunction(Function function) {
    -           if (function == null) {
    -                   throw new IllegalArgumentException();
    -           }
    -           
    -           for (Class<?> clazz = function.getClass(); clazz != null; clazz 
= clazz.getSuperclass()) {
    -                   try {
    -                           Method replaceMethod = 
clazz.getDeclaredMethod("writeReplace");
    -                           replaceMethod.setAccessible(true);
    -                           Object serialVersion = 
replaceMethod.invoke(function);
    -                           
    -                           if 
(serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda"))
 {
    -                                   return true;
    +                                   // check if class is a lambda function
    +                                   if 
(serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda"))
 {
    +
    +                                           // check if SerializedLambda 
class is present
    +                                           try {
    +                                                   
Class.forName("java.lang.invoke.SerializedLambda");
    +                                           }
    +                                           catch (Exception e) {
    +                                                   throw new 
UnsupportedOperationException("User code tries to use lambdas, but framework is 
running with a Java version < 8");
    +                                           }
    +                                           serializedLambda = 
serialVersion;
    +                                           break;
    +                                   }
    +                           }
    +                           catch (NoSuchMethodException e) {
    +                                   // thrown if the method is not there. 
fall through the loop
                                }
                        }
    -                   catch (NoSuchMethodException e) {
    -                           // thrown if the method is not there. fall 
through the loop
    +
    +                   // not a lambda method -> return null
    +                   if (serializedLambda == null) {
    +                           return null;
                        }
    -                   catch (Throwable t) {
    -                           // this should not happen, we are not executing 
any method code.
    -                           throw new RuntimeException("Error while 
checking whether function is a lambda.", t);
    +
    +                   // find lambda method
    +                   Method implClassMethod = 
serializedLambda.getClass().getDeclaredMethod("getImplClass");
    +                   Method implMethodNameMethod = 
serializedLambda.getClass().getDeclaredMethod("getImplMethodName");
    +
    +                   String className = (String) 
implClassMethod.invoke(serializedLambda);
    +                   String methodName = (String) 
implMethodNameMethod.invoke(serializedLambda);
    +
    +                   Class<?> implClass = 
Class.forName(className.replace('/', '.'));
    +
    +                   Method[] methods = implClass.getDeclaredMethods();
    +                   Method parameterizedMethod = null;
    +                   for(Method method : methods) {
    +                           if(method.getName().equals(methodName)) {
    +                                   if(parameterizedMethod != null) {
    +                                           // It is very unlikely that a 
class contains multiple e.g. "lambda$2()" but its possible
    +                                           // Actually, the signature need 
to be checked, but this is very complex
    +                                           throw new Exception("Lambda 
method name is not unique.");
    --- End diff --
    
    Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to