Repository: flink
Updated Branches:
  refs/heads/master 6731ec1e4 -> 1dda3ad00


[FLINK-4793] [types] Improve lambda constructor reference handling

This closes #2621.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dda3ad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dda3ad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dda3ad0

Branch: refs/heads/master
Commit: 1dda3ad009667697a620359e997e83a5ba2447dd
Parents: 6731ec1
Author: twalthr <twal...@apache.org>
Authored: Tue Oct 11 15:33:20 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Wed Oct 12 11:02:45 2016 +0200

----------------------------------------------------------------------
 flink-core/pom.xml                              |   7 +
 .../common/functions/util/FunctionUtils.java    |  69 --------
 .../java/typeutils/TypeExtractionException.java |  57 +++++++
 .../api/java/typeutils/TypeExtractionUtils.java | 167 +++++++++++++++++++
 .../flink/api/java/typeutils/TypeExtractor.java |  80 ++++-----
 .../java/type/lambdas/LambdaExtractionTest.java |  27 +--
 .../javaApiOperators/lambdas/MapITCase.java     |  20 ++-
 7 files changed, 306 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 865a253..cfa2cbb 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -80,6 +80,13 @@ under the License.
                        </exclusions>
                </dependency>
 
+               <!-- ASM is needed for type extraction -->
+               <dependency>
+                       <groupId>org.ow2.asm</groupId>
+                       <artifactId>asm-all</artifactId>
+                       <version>${asm.version}</version>
+               </dependency>
+
                <!-- test dependencies -->
                <dependency>
                        <groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
index ffd885b..2bb1cb3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.common.functions.util;
 
-import java.lang.reflect.Method;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RichFunction;
@@ -62,73 +60,6 @@ public final class FunctionUtils {
                        return defaultContext;
                }
        }
-       
-       public static Method checkAndExtractLambdaMethod(Function function) {
-               try {
-                       // get serialized lambda
-                       Object serializedLambda = null;
-                       for (Class<?> clazz = function.getClass(); clazz != 
null; clazz = clazz.getSuperclass()) {
-                               try {
-                                       Method replaceMethod = 
clazz.getDeclaredMethod("writeReplace");
-                                       replaceMethod.setAccessible(true);
-                                       Object serialVersion = 
replaceMethod.invoke(function);
-
-                                       // check if class is a lambda function
-                                       if 
(serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda"))
 {
-
-                                               // check if SerializedLambda 
class is present
-                                               try {
-                                                       
Class.forName("java.lang.invoke.SerializedLambda");
-                                               }
-                                               catch (Exception e) {
-                                                       throw new 
UnsupportedOperationException("User code tries to use lambdas, but framework is 
running with a Java version < 8");
-                                               }
-                                               serializedLambda = 
serialVersion;
-                                               break;
-                                       }
-                               }
-                               catch (NoSuchMethodException e) {
-                                       // thrown if the method is not there. 
fall through the loop
-                               }
-                       }
-
-                       // not a lambda method -> return null
-                       if (serializedLambda == null) {
-                               return null;
-                       }
-
-                       // find lambda method
-                       Method implClassMethod = 
serializedLambda.getClass().getDeclaredMethod("getImplClass");
-                       Method implMethodNameMethod = 
serializedLambda.getClass().getDeclaredMethod("getImplMethodName");
-
-                       String className = (String) 
implClassMethod.invoke(serializedLambda);
-                       String methodName = (String) 
implMethodNameMethod.invoke(serializedLambda);
-
-                       Class<?> implClass = 
Class.forName(className.replace('/', '.'), true, 
Thread.currentThread().getContextClassLoader());
-
-                       Method[] methods = implClass.getDeclaredMethods();
-                       Method parameterizedMethod = null;
-                       for (Method method : methods) {
-                               if(method.getName().equals(methodName)) {
-                                       if(parameterizedMethod != null) {
-                                               // It is very unlikely that a 
class contains multiple e.g. "lambda$2()" but its possible
-                                               // Actually, the signature need 
to be checked, but this is very complex
-                                               throw new Exception("Lambda 
method name is not unique.");
-                                       }
-                                       else {
-                                               parameterizedMethod = method;
-                                       }
-                               }
-                       }
-                       if (parameterizedMethod == null) {
-                               throw new Exception("No lambda method found.");
-                       }
-                       return parameterizedMethod;
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Could not extract lambda 
method out of function: " + e.getClass().getSimpleName() + " - " + 
e.getMessage(), e);
-               }
-       }
 
        /**
         * Private constructor to prevent instantiation.

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java
new file mode 100644
index 0000000..0dad55d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Type extraction always contains some uncertainty due to unpredictable JVM 
differences
+ * between vendors or versions. This exception is thrown if an assumption 
failed during extraction.
+ */
+@Internal
+public class TypeExtractionException extends Exception {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Creates a new exception with no message.
+        */
+       public TypeExtractionException() {
+               super();
+       }
+
+       /**
+        * Creates a new exception with the given message.
+        *
+        * @param message The exception message.
+        */
+       public TypeExtractionException(String message) {
+               super(message);
+       }
+
+       /**
+        * Creates a new exception with the given message and cause.
+        *
+        * @param message The exception message.
+        * @param e cause
+        */
+       public TypeExtractionException(String message, Throwable e) {
+               super(message, e);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
new file mode 100644
index 0000000..4439612
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.Function;
+import static org.objectweb.asm.Type.getConstructorDescriptor;
+import static org.objectweb.asm.Type.getMethodDescriptor;
+
+@Internal
+public class TypeExtractionUtils {
+
+       private TypeExtractionUtils() {
+               // do not allow instantiation
+       }
+
+       /**
+        * Similar to a Java 8 Executable but with a return type.
+        */
+       public static class LambdaExecutable {
+
+               private Type[] parameterTypes;
+               private Type returnType;
+               private String name;
+               private Object executable;
+
+               public LambdaExecutable(Constructor<?> constructor) {
+                       this.parameterTypes = 
constructor.getGenericParameterTypes();
+                       this.returnType = constructor.getDeclaringClass();
+                       this.name = constructor.getName();
+                       this.executable = constructor;
+               }
+
+               public LambdaExecutable(Method method) {
+                       this.parameterTypes = method.getGenericParameterTypes();
+                       this.returnType = method.getGenericReturnType();
+                       this.name = method.getName();
+                       this.executable = method;
+               }
+
+               public Type[] getParameterTypes() {
+                       return parameterTypes;
+               }
+
+               public Type getReturnType() {
+                       return returnType;
+               }
+
+               public String getName() {
+                       return name;
+               }
+
+               public boolean executablesEquals(Method m) {
+                       return executable.equals(m);
+               }
+
+               public boolean executablesEquals(Constructor<?> c) {
+                       return executable.equals(c);
+               }
+       }
+
+       public static LambdaExecutable checkAndExtractLambda(Function function) 
throws TypeExtractionException {
+               try {
+                       // get serialized lambda
+                       Object serializedLambda = null;
+                       for (Class<?> clazz = function.getClass(); clazz != 
null; clazz = clazz.getSuperclass()) {
+                               try {
+                                       Method replaceMethod = 
clazz.getDeclaredMethod("writeReplace");
+                                       replaceMethod.setAccessible(true);
+                                       Object serialVersion = 
replaceMethod.invoke(function);
+
+                                       // check if class is a lambda function
+                                       if 
(serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda"))
 {
+
+                                               // check if SerializedLambda 
class is present
+                                               try {
+                                                       
Class.forName("java.lang.invoke.SerializedLambda");
+                                               }
+                                               catch (Exception e) {
+                                                       throw new 
TypeExtractionException("User code tries to use lambdas, but framework is 
running with a Java version < 8");
+                                               }
+                                               serializedLambda = 
serialVersion;
+                                               break;
+                                       }
+                               }
+                               catch (NoSuchMethodException e) {
+                                       // thrown if the method is not there. 
fall through the loop
+                               }
+                       }
+
+                       // not a lambda method -> return null
+                       if (serializedLambda == null) {
+                               return null;
+                       }
+
+                       // find lambda method
+                       Method implClassMethod = 
serializedLambda.getClass().getDeclaredMethod("getImplClass");
+                       Method implMethodNameMethod = 
serializedLambda.getClass().getDeclaredMethod("getImplMethodName");
+                       Method implMethodSig = 
serializedLambda.getClass().getDeclaredMethod("getImplMethodSignature");
+
+                       String className = (String) 
implClassMethod.invoke(serializedLambda);
+                       String methodName = (String) 
implMethodNameMethod.invoke(serializedLambda);
+                       String methodSig = (String) 
implMethodSig.invoke(serializedLambda);
+
+                       Class<?> implClass = 
Class.forName(className.replace('/', '.'), true, 
Thread.currentThread().getContextClassLoader());
+
+                       // find constructor
+                       if (methodName.equals("<init>")) {
+                               Constructor<?>[] constructors = 
implClass.getDeclaredConstructors();
+                               for (Constructor<?> constructor : constructors) 
{
+                                       
if(getConstructorDescriptor(constructor).equals(methodSig)) {
+                                               return new 
LambdaExecutable(constructor);
+                                       }
+                               }
+                       }
+                       // find method
+                       else {
+                               List<Method> methods = 
getAllDeclaredMethods(implClass);
+                               for (Method method : methods) {
+                                       if(method.getName().equals(methodName) 
&& getMethodDescriptor(method).equals(methodSig)) {
+                                               return new 
LambdaExecutable(method);
+                                       }
+                               }
+                       }
+                       throw new TypeExtractionException("No lambda method 
found.");
+               }
+               catch (Exception e) {
+                       throw new TypeExtractionException("Could not extract 
lambda method out of function: " +
+                               e.getClass().getSimpleName() + " - " + 
e.getMessage(), e);
+               }
+       }
+
+       /**
+        * Returns all declared methods of a class including methods of 
superclasses.
+        */
+       public static List<Method> getAllDeclaredMethods(Class<?> clazz) {
+               List<Method> result = new ArrayList<>();
+               while (clazz != null) {
+                       Method[] methods = clazz.getDeclaredMethods();
+                       Collections.addAll(result, methods);
+                       clazz = clazz.getSuperclass();
+               }
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a0b09f5..c1febea 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -53,7 +53,6 @@ import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -66,6 +65,9 @@ import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
+import 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
 import org.apache.flink.types.Either;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
@@ -380,22 +382,27 @@ public class TypeExtractor {
                String functionName,
                boolean allowMissing) {
                try {
-                       final Method m = 
FunctionUtils.checkAndExtractLambdaMethod(function);
-                       if (m != null) {
+                       final LambdaExecutable exec;
+                       try {
+                               exec = checkAndExtractLambda(function);
+                       } catch (TypeExtractionException e) {
+                               throw new InvalidTypesException("Internal error 
occurred.", e);
+                       }
+                       if (exec != null) {
                                // check for lambda type erasure
-                               validateLambdaGenericParameters(m);
+                               validateLambdaGenericParameters(exec);
 
                                // parameters must be accessed from behind, 
since JVM can add additional parameters e.g. when using local variables inside 
lambda function
-                               final int paramLen = 
m.getGenericParameterTypes().length - 1;
+                               final int paramLen = 
exec.getParameterTypes().length - 1;
 
-                               // method references "this" implicitly
+                               // executable references "this" implicitly
                                if (paramLen < 0) {
-                                       // methods declaring class can also be 
a super class of the input type
-                                       // we only validate if the method 
exists in input type
-                                       validateInputContainsMethod(m, inType);
+                                       // executable declaring class can also 
be a super class of the input type
+                                       // we only validate if the executable 
exists in input type
+                                       validateInputContainsExecutable(exec, 
inType);
                                }
                                else {
-                                       final Type input = 
(outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 1] : 
m.getGenericParameterTypes()[paramLen];
+                                       final Type input = 
(outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 1] : 
exec.getParameterTypes()[paramLen];
                                        
validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, 
inputTypeArgumentIndex) : input, inType);
                                }
 
@@ -403,7 +410,7 @@ public class TypeExtractor {
                                        return ((ResultTypeQueryable<OUT>) 
function).getProducedType();
                                }
                                return new 
TypeExtractor().privateCreateTypeInfo(
-                                       (outputTypeArgumentIndex >= 0) ? 
extractTypeArgument(m.getGenericParameterTypes()[paramLen], 
outputTypeArgumentIndex) : m.getGenericReturnType(),
+                                       (outputTypeArgumentIndex >= 0) ? 
extractTypeArgument(exec.getParameterTypes()[paramLen], 
outputTypeArgumentIndex) : exec.getReturnType(),
                                        inType,
                                        null);
                        }
@@ -496,22 +503,27 @@ public class TypeExtractor {
                String functionName,
                boolean allowMissing) {
                try {
-                       final Method m = 
FunctionUtils.checkAndExtractLambdaMethod(function);
-                       if (m != null) {
+                       final LambdaExecutable exec;
+                       try {
+                               exec = checkAndExtractLambda(function);
+                       } catch (TypeExtractionException e) {
+                               throw new InvalidTypesException("Internal error 
occurred.", e);
+                       }
+                       if (exec != null) {
                                // check for lambda type erasure
-                               validateLambdaGenericParameters(m);
+                               validateLambdaGenericParameters(exec);
                                
                                // parameters must be accessed from behind, 
since JVM can add additional parameters e.g. when using local variables inside 
lambda function
-                               final int paramLen = 
m.getGenericParameterTypes().length - 1;
-                               final Type input1 = (outputTypeArgumentIndex >= 
0) ? m.getGenericParameterTypes()[paramLen - 2] : 
m.getGenericParameterTypes()[paramLen - 1];
-                               final Type input2 = (outputTypeArgumentIndex >= 
0 ) ? m.getGenericParameterTypes()[paramLen - 1] : 
m.getGenericParameterTypes()[paramLen];
+                               final int paramLen = 
exec.getParameterTypes().length - 1;
+                               final Type input1 = (outputTypeArgumentIndex >= 
0) ? exec.getParameterTypes()[paramLen - 2] : exec.getParameterTypes()[paramLen 
- 1];
+                               final Type input2 = (outputTypeArgumentIndex >= 
0 ) ? exec.getParameterTypes()[paramLen - 1] : 
exec.getParameterTypes()[paramLen];
                                validateInputType((inputTypeArgumentIndex >= 0) 
? extractTypeArgument(input1, inputTypeArgumentIndex) : input1, in1Type);
                                validateInputType((inputTypeArgumentIndex >= 0) 
? extractTypeArgument(input2, inputTypeArgumentIndex) : input2, in2Type);
                                if(function instanceof ResultTypeQueryable) {
                                        return ((ResultTypeQueryable<OUT>) 
function).getProducedType();
                                }
                                return new 
TypeExtractor().privateCreateTypeInfo(
-                                       (outputTypeArgumentIndex >= 0) ? 
extractTypeArgument(m.getGenericParameterTypes()[paramLen], 
outputTypeArgumentIndex) : m.getGenericReturnType(),
+                                       (outputTypeArgumentIndex >= 0) ? 
extractTypeArgument(exec.getParameterTypes()[paramLen], 
outputTypeArgumentIndex) : exec.getReturnType(),
                                        in1Type,
                                        in2Type);
                        }
@@ -1358,14 +1370,20 @@ public class TypeExtractor {
                }
        }
 
-       private static void validateInputContainsMethod(Method m, 
TypeInformation<?> typeInfo) {
+       private static void validateInputContainsExecutable(LambdaExecutable 
exec, TypeInformation<?> typeInfo) {
                List<Method> methods = 
getAllDeclaredMethods(typeInfo.getTypeClass());
                for (Method method : methods) {
-                       if (method.equals(m)) {
+                       if (exec.executablesEquals(method)) {
                                return;
                        }
                }
-               throw new InvalidTypesException("Type contains no method '" + 
m.getName() + "'.");
+               Constructor<?>[] constructors = 
typeInfo.getTypeClass().getDeclaredConstructors();
+               for (Constructor<?> constructor : constructors) {
+                       if (exec.executablesEquals(constructor)) {
+                               return;
+                       }
+               }
+               throw new InvalidTypesException("Type contains no executable '" 
+ exec.getName() + "'.");
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -1488,14 +1506,14 @@ public class TypeExtractor {
                }
        }
        
-       private static void validateLambdaGenericParameters(Method m) {
+       private static void validateLambdaGenericParameters(LambdaExecutable 
exec) {
                // check the arguments
-               for (Type t : m.getGenericParameterTypes()) {
+               for (Type t : exec.getParameterTypes()) {
                        validateLambdaGenericParameter(t);
                }
 
                // check the return type
-               validateLambdaGenericParameter(m.getGenericReturnType());
+               validateLambdaGenericParameter(exec.getReturnType());
        }
 
        private static void validateLambdaGenericParameter(Type t) {
@@ -1974,20 +1992,6 @@ public class TypeExtractor {
                return false;
        }
 
-       
-       // recursively determine all declared methods
-       private static List<Method> getAllDeclaredMethods(Class<?> clazz) {
-               List<Method> result = new ArrayList<Method>();
-               while (clazz != null) {
-                       Method[] methods = clazz.getDeclaredMethods();
-                       for (Method method : methods) {
-                               result.add(method);
-                       }
-                       clazz = clazz.getSuperclass();
-               }
-               return result;
-       }
-
        @Internal
        public static Class<?> typeToClass(Type t) {
                if (t instanceof Class) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
 
b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index 64b7ae7..0d7415a 100644
--- 
a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ 
b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.java.type.lambdas;
 
+import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
@@ -33,7 +35,6 @@ import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -74,14 +75,14 @@ public class LambdaExtractionTest {
                        MapFunction<Integer, String> instanceLambda = 
Object::toString;
                        MapFunction<String, Integer> constructorLambda = 
Integer::new;
 
-                       
assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromInterface));
-                       
assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromClass));
-                       
assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromProperClass));
-                       
assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromDerived));
-                       
assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(staticLambda));
-                       
assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(instanceLambda));
-                       
assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(constructorLambda));
-                       
assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(STATIC_LAMBDA));
+                       
assertNull(checkAndExtractLambda(anonymousFromInterface));
+                       assertNull(checkAndExtractLambda(anonymousFromClass));
+                       assertNull(checkAndExtractLambda(fromProperClass));
+                       assertNull(checkAndExtractLambda(fromDerived));
+                       assertNotNull(checkAndExtractLambda(staticLambda));
+                       assertNotNull(checkAndExtractLambda(instanceLambda));
+                       assertNotNull(checkAndExtractLambda(constructorLambda));
+                       assertNotNull(checkAndExtractLambda(STATIC_LAMBDA));
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -272,14 +273,14 @@ public class LambdaExtractionTest {
        public void testInstanceMethodRefSameType() {
                MapFunction<MyType, Integer> f = MyType::getKey;
                TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
TypeExtractor.createTypeInfo(MyType.class));
-               Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO);
+               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
        }
 
        @Test
        public void testInstanceMethodRefSuperType() {
                MapFunction<Integer, String> f = Object::toString;
                TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
BasicTypeInfo.INT_TYPE_INFO);
-               Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+               Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
        }
 
        public static class MySubtype extends MyType {
@@ -290,14 +291,14 @@ public class LambdaExtractionTest {
        public void testInstanceMethodRefSuperTypeProtected() {
                MapFunction<MySubtype, Integer> f = MyType::getKey2;
                TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
TypeExtractor.createTypeInfo(MySubtype.class));
-               Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO);
+               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
        }
 
        @Test
        public void testConstructorMethodRef() {
                MapFunction<String, Integer> f = Integer::new;
                TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
BasicTypeInfo.STRING_TYPE_INFO);
-               Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO);
+               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
index cda1f1c..87c1fa5 100644
--- 
a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
+++ 
b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -24,6 +24,20 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 
 public class MapITCase extends JavaProgramTestBase {
 
+       public static class Trade {
+
+               public String v;
+
+               public Trade(String v) {
+                       this.v = v;
+               }
+
+               @Override
+               public String toString() {
+                       return v;
+               }
+       }
+
        private static final String EXPECTED_RESULT = "22\n" +
                        "22\n" +
                        "23\n" +
@@ -41,7 +55,11 @@ public class MapITCase extends JavaProgramTestBase {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
-               DataSet<String> mappedDs = stringDs.map(Object::toString).map 
(s -> s.replace("1", "2"));
+               DataSet<String> mappedDs = stringDs
+                       .map(Object::toString)
+                       .map (s -> s.replace("1", "2"))
+                       .map(Trade::new)
+                       .map(Trade::toString);
                mappedDs.writeAsText(resultPath);
                env.execute();
        }

Reply via email to