[FLINK-6783] Changed passing index of type argument while extracting return 
type.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcaf816d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcaf816d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcaf816d

Branch: refs/heads/master
Commit: bcaf816dc5313c6c7de1e3436cc87036fd2ea3d0
Parents: 1cc1bb4
Author: Dawid Wysakowicz <[email protected]>
Authored: Thu Jun 1 13:17:25 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Jun 8 10:42:41 2017 +0200

----------------------------------------------------------------------
 .../flink/api/common/functions/Partitioner.java |   6 +-
 .../api/java/typeutils/TypeExtractionUtils.java |  74 +++
 .../flink/api/java/typeutils/TypeExtractor.java | 642 ++++++++++++-------
 .../java/type/lambdas/LambdaExtractionTest.java |  13 +
 .../org/apache/flink/cep/CEPLambdaTest.java     |   2 -
 .../org/apache/flink/cep/PatternStream.java     |  24 +-
 .../cep/operator/CEPMigration11to13Test.java    |   4 +-
 .../flink/graph/asm/translate/Translate.java    |  44 +-
 .../api/datastream/AllWindowedStream.java       |  57 +-
 .../api/datastream/AsyncDataStream.java         |  13 +-
 .../api/datastream/CoGroupedStreams.java        |  15 +-
 .../api/datastream/ConnectedStreams.java        |  48 +-
 .../streaming/api/datastream/DataStream.java    |  16 +-
 .../streaming/api/datastream/JoinedStreams.java |  40 +-
 .../streaming/api/datastream/KeyedStream.java   |  16 +-
 .../api/datastream/WindowedStream.java          |  58 +-
 .../windowing/WindowTranslationTest.java        | 232 +++----
 17 files changed, 846 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
index 6c237ed..c272d3a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
@@ -22,15 +22,15 @@ import org.apache.flink.annotation.Public;
 
 /**
  * Function to implement a custom partition assignment for keys.
- * 
+ *
  * @param <K> The type of the key to be partitioned.
  */
 @Public
-public interface Partitioner<K> extends java.io.Serializable {
+public interface Partitioner<K> extends java.io.Serializable, Function {
 
        /**
         * Computes the partition for the given key.
-        * 
+        *
         * @param key The key.
         * @param numPartitions The number of partitions to partition into.
         * @return The partition index.

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index 0aac257..c6ffd55 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
@@ -28,6 +29,8 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+
 import static org.objectweb.asm.Type.getConstructorDescriptor;
 import static org.objectweb.asm.Type.getMethodDescriptor;
 
@@ -161,6 +164,77 @@ public class TypeExtractionUtils {
        }
 
        /**
+        * Extracts type from given index from lambda. It supports nested types.
+        *
+        * @param exec lambda function to extract the type from
+        * @param lambdaTypeArgumentIndices position of type to extract in type 
hierarchy
+        * @param paramLen count of total parameters of the lambda (including 
closure parameters)
+        * @param baseParametersLen count of lambda interface parameters 
(without closure parameters)
+        * @return extracted type
+        */
+       public static Type extractTypeFromLambda(
+               LambdaExecutable exec,
+               int[] lambdaTypeArgumentIndices,
+               int paramLen,
+               int baseParametersLen) {
+               Type output = exec.getParameterTypes()[paramLen - 
baseParametersLen + lambdaTypeArgumentIndices[0]];
+               for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) {
+                       output = extractTypeArgument(output, 
lambdaTypeArgumentIndices[i]);
+               }
+               return output;
+       }
+
+       /**
+        * This method extracts the n-th type argument from the given type. An 
InvalidTypesException
+        * is thrown if the type does not have any type arguments or if the 
index exceeds the number
+        * of type arguments.
+        *
+        * @param t Type to extract the type arguments from
+        * @param index Index of the type argument to extract
+        * @return The extracted type argument
+        * @throws InvalidTypesException if the given type does not have any 
type arguments or if the
+        * index exceeds the number of type arguments.
+        */
+       public static Type extractTypeArgument(Type t, int index) throws 
InvalidTypesException {
+               if (t instanceof ParameterizedType) {
+                       Type[] actualTypeArguments = ((ParameterizedType) 
t).getActualTypeArguments();
+
+                       if (index < 0 || index >= actualTypeArguments.length) {
+                               throw new InvalidTypesException("Cannot extract 
the type argument with index " +
+                                                                               
                index + " because the type has only " + 
actualTypeArguments.length +
+                                                                               
                " type arguments.");
+                       } else {
+                               return actualTypeArguments[index];
+                       }
+               } else {
+                       throw new InvalidTypesException("The given type " + t + 
" is not a parameterized type.");
+               }
+       }
+
+       /**
+        * Extracts a Single Abstract Method (SAM) as defined in Java 
Specification (4.3.2. The Class Object,
+        * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given 
class.
+        *
+        * @param baseClass
+        * @throws InvalidTypesException if the given class does not implement
+        * @return
+        */
+       public static Method getSingleAbstractMethod(Class<?> baseClass) {
+               Method sam = null;
+               for (Method method : baseClass.getMethods()) {
+                       if (Modifier.isAbstract(method.getModifiers())) {
+                               if (sam == null) {
+                                       sam = method;
+                               } else {
+                                       throw new InvalidTypesException(
+                                               "Given class: " + baseClass + " 
is not a FunctionalInterface. It does not have a SAM.");
+                               }
+                       }
+               }
+               return sam;
+       }
+
+       /**
         * Returns all declared methods of a class including methods of 
superclasses.
         */
        public static List<Method> getAllDeclaredMethods(Class<?> clazz) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 112ca38..c50dfc9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -116,6 +116,8 @@ public class TypeExtractor {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(TypeExtractor.class);
 
+       public static final int[] NO_INDEX = new int[] {};
+
        protected TypeExtractor() {
                // only create instances for special use cases
        }
@@ -161,9 +163,18 @@ public class TypeExtractor {
        public static <IN, OUT> TypeInformation<OUT> 
getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType,
                        String functionName, boolean allowMissing)
        {
-               return getUnaryOperatorReturnType((Function) mapInterface, 
MapFunction.class, false, false, inType, functionName, allowMissing);
+               return getUnaryOperatorReturnType(
+                       (Function) mapInterface,
+                       MapFunction.class,
+                       0,
+                       1,
+                       new int[]{0},
+                       NO_INDEX,
+                       inType,
+                       functionName,
+                       allowMissing);
        }
-       
+
 
        @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, 
TypeInformation<IN> inType) {
@@ -174,7 +185,16 @@ public class TypeExtractor {
        public static <IN, OUT> TypeInformation<OUT> 
getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, 
TypeInformation<IN> inType,
                        String functionName, boolean allowMissing)
        {
-               return getUnaryOperatorReturnType((Function) flatMapInterface, 
FlatMapFunction.class, false, true, inType, functionName, allowMissing);
+               return getUnaryOperatorReturnType(
+                       (Function) flatMapInterface,
+                       FlatMapFunction.class,
+                       0,
+                       1,
+                       new int[]{0},
+                       new int[]{1, 0},
+                       inType,
+                       functionName,
+                       allowMissing);
        }
 
        /**
@@ -194,7 +214,16 @@ public class TypeExtractor {
        @Deprecated
        public static <IN, OUT> TypeInformation<OUT> 
getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> 
inType, String functionName, boolean allowMissing)
        {
-               return getUnaryOperatorReturnType((Function) foldInterface, 
FoldFunction.class, false, false, inType, functionName, allowMissing);
+               return getUnaryOperatorReturnType(
+                       (Function) foldInterface,
+                       FoldFunction.class,
+                       0,
+                       1,
+                       new int[]{1},
+                       NO_INDEX,
+                       inType,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -205,7 +234,15 @@ public class TypeExtractor {
                        boolean allowMissing)
        {
                return getUnaryOperatorReturnType(
-                       function, AggregateFunction.class, 0, 1, inType, 
functionName, allowMissing);
+                       function,
+                       AggregateFunction.class,
+                       0,
+                       1,
+                       new int[]{0},
+                       NO_INDEX,
+                       inType,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -216,7 +253,15 @@ public class TypeExtractor {
                        boolean allowMissing)
        {
                return getUnaryOperatorReturnType(
-                               function, AggregateFunction.class, 0, 2, 
inType, functionName, allowMissing);
+                       function,
+                       AggregateFunction.class,
+                       0,
+                       2,
+                       NO_INDEX,
+                       NO_INDEX,
+                       inType,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -228,7 +273,16 @@ public class TypeExtractor {
        public static <IN, OUT> TypeInformation<OUT> 
getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, 
TypeInformation<IN> inType,
                        String functionName, boolean allowMissing)
        {
-               return getUnaryOperatorReturnType((Function) 
mapPartitionInterface, MapPartitionFunction.class, true, true, inType, 
functionName, allowMissing);
+               return getUnaryOperatorReturnType(
+                       (Function) mapPartitionInterface,
+                       MapPartitionFunction.class,
+                       0,
+                       1,
+                       new int[]{0, 0},
+                       new int[]{1, 0},
+                       inType,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -240,7 +294,16 @@ public class TypeExtractor {
        public static <IN, OUT> TypeInformation<OUT> 
getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, 
TypeInformation<IN> inType,
                        String functionName, boolean allowMissing)
        {
-               return getUnaryOperatorReturnType((Function) 
groupReduceInterface, GroupReduceFunction.class, true, true, inType, 
functionName, allowMissing);
+               return getUnaryOperatorReturnType(
+                       (Function) groupReduceInterface,
+                       GroupReduceFunction.class,
+                       0,
+                       1,
+                       new int[]{0, 0},
+                       new int[]{1, 0},
+                       inType,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -252,7 +315,16 @@ public class TypeExtractor {
        public static <IN, OUT> TypeInformation<OUT> 
getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, 
TypeInformation<IN> inType,
                                                                                
                                                                        String 
functionName, boolean allowMissing)
        {
-               return getUnaryOperatorReturnType((Function) combineInterface, 
GroupCombineFunction.class, true, true, inType, functionName, allowMissing);
+               return getUnaryOperatorReturnType(
+                       (Function) combineInterface,
+                       GroupCombineFunction.class,
+                       0,
+                       1,
+                       new int[]{0, 0},
+                       new int[]{1, 0},
+                       inType,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -266,8 +338,19 @@ public class TypeExtractor {
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type, String functionName, boolean allowMissing)
        {
-               return getBinaryOperatorReturnType((Function) joinInterface, 
FlatJoinFunction.class, false, true,
-                               in1Type, in2Type, functionName, allowMissing);
+               return getBinaryOperatorReturnType(
+                       (Function) joinInterface,
+                       FlatJoinFunction.class,
+                       0,
+                       1,
+                       2,
+                       new int[]{0},
+                       new int[]{1},
+                       new int[]{2, 0},
+                       in1Type,
+                       in2Type,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -281,8 +364,19 @@ public class TypeExtractor {
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type, String functionName, boolean allowMissing)
        {
-               return getBinaryOperatorReturnType((Function) joinInterface, 
JoinFunction.class, false, false,
-                               in1Type, in2Type, functionName, allowMissing);
+               return getBinaryOperatorReturnType(
+                       (Function) joinInterface,
+                       JoinFunction.class,
+                       0,
+                       1,
+                       2,
+                       new int[]{0},
+                       new int[]{1},
+                       NO_INDEX,
+                       in1Type,
+                       in2Type,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -296,8 +390,19 @@ public class TypeExtractor {
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type, String functionName, boolean allowMissing)
        {
-               return getBinaryOperatorReturnType((Function) coGroupInterface, 
CoGroupFunction.class, true, true,
-                               in1Type, in2Type, functionName, allowMissing);
+               return getBinaryOperatorReturnType(
+                       (Function) coGroupInterface,
+                       CoGroupFunction.class,
+                       0,
+                       1,
+                       2,
+                       new int[]{0, 0},
+                       new int[]{1, 0},
+                       new int[]{2, 0},
+                       in1Type,
+                       in2Type,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -311,8 +416,19 @@ public class TypeExtractor {
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type, String functionName, boolean allowMissing)
        {
-               return getBinaryOperatorReturnType((Function) crossInterface, 
CrossFunction.class, false, false,
-                               in1Type, in2Type, functionName, allowMissing);
+               return getBinaryOperatorReturnType(
+                       (Function) crossInterface,
+                       CrossFunction.class,
+                       0,
+                       1,
+                       2,
+                       new int[]{0},
+                       new int[]{1},
+                       NO_INDEX,
+                       in1Type,
+                       in2Type,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -324,7 +440,16 @@ public class TypeExtractor {
        public static <IN, OUT> TypeInformation<OUT> 
getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface,
                        TypeInformation<IN> inType, String functionName, 
boolean allowMissing)
        {
-               return getUnaryOperatorReturnType((Function) selectorInterface, 
KeySelector.class, false, false, inType, functionName, allowMissing);
+               return getUnaryOperatorReturnType(
+                       (Function) selectorInterface,
+                       KeySelector.class,
+                       0,
+                       1,
+                       new int[]{0},
+                       NO_INDEX,
+                       inType,
+                       functionName,
+                       allowMissing);
        }
 
        @PublicEvolving
@@ -333,11 +458,53 @@ public class TypeExtractor {
        }
 
        @PublicEvolving
-       public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> 
partitioner, String functionName, boolean allowMissing) {
-               return new 
TypeExtractor().privateCreateTypeInfo(Partitioner.class, 
partitioner.getClass(), 0, null, null);
+       public static <T> TypeInformation<T> getPartitionerTypes(
+               Partitioner<T> partitioner,
+               String functionName,
+               boolean allowMissing) {
+               try {
+                       final LambdaExecutable exec;
+                       try {
+                               exec = checkAndExtractLambda(partitioner);
+                       } catch (TypeExtractionException e) {
+                               throw new InvalidTypesException("Internal error 
occurred.", e);
+                       }
+                       if (exec != null) {
+                               // check for lambda type erasure
+                               validateLambdaGenericParameters(exec);
+
+                               // parameters must be accessed from behind, 
since JVM can add additional parameters e.g. when using local variables inside 
lambda function
+                               // paramLen is the total number of parameters 
of the provided lambda, it includes parameters added through closure
+                               final int paramLen = 
exec.getParameterTypes().length;
+
+                               final Method sam = 
TypeExtractionUtils.getSingleAbstractMethod(Partitioner.class);
+                               // number of parameters the SAM of implemented 
interface has, the parameter indexing aplicates to this range
+                               final int baseParametersLen = 
sam.getParameterTypes().length;
+
+                               final Type keyType = 
TypeExtractionUtils.extractTypeFromLambda(
+                                       exec,
+                                       new int[]{0},
+                                       paramLen,
+                                       baseParametersLen);
+                               return new 
TypeExtractor().privateCreateTypeInfo(keyType, null, null);
+                       } else {
+                               return new 
TypeExtractor().privateCreateTypeInfo(
+                                       Partitioner.class,
+                                       partitioner.getClass(),
+                                       0,
+                                       null,
+                                       null);
+                       }
+               } catch (InvalidTypesException e) {
+                       if (allowMissing) {
+                               return (TypeInformation<T>) new 
MissingTypeInfo(functionName != null ? functionName : partitioner.toString(), 
e);
+                       } else {
+                               throw e;
+                       }
+               }
        }
-       
-       
+
+
        @SuppressWarnings("unchecked")
        @PublicEvolving
        public static <IN> TypeInformation<IN> 
getInputFormatTypes(InputFormat<IN, ?> inputFormatInterface) {
@@ -354,49 +521,21 @@ public class TypeExtractor {
        /**
         * Returns the unary operator's return type.
         *
-        * @param function Function to extract the return type from
-        * @param baseClass Base class of the function
-        * @param hasIterable True if the first function parameter is an 
iterable, otherwise false
-        * @param hasCollector True if the function has an additional collector 
parameter, otherwise false
-        * @param inType Type of the input elements (In case of an iterable, it 
is the element type)
-        * @param functionName Function name
-        * @param allowMissing Can the type information be missing
-        * @param <IN> Input type
-        * @param <OUT> Output type
-        * @return TypeInformation of the return type of the function
-        */
-       @SuppressWarnings("unchecked")
-       @PublicEvolving
-       public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
-               Function function,
-               Class<?> baseClass,
-               boolean hasIterable,
-               boolean hasCollector,
-               TypeInformation<IN> inType,
-               String functionName,
-               boolean allowMissing) {
-
-               return getUnaryOperatorReturnType(
-                       function,
-                       baseClass,
-                       hasIterable ? 0 : -1,
-                       hasCollector ? 0 : -1,
-                       inType,
-                       functionName,
-                       allowMissing);
-       }
-
-       /**
-        * Returns the unary operator's return type.
+        * <p><b>NOTE:</b> lambda type indices allow extraction of Type from 
lambdas. To extract input type <b>IN</b>
+        * from the function given below one should pass {@code new int[] 
{0,1,0}} as lambdaInputTypeArgumentIndices.
+        *
+        * <pre>
+        * <code>
+        * OUT apply(Map<String, List<IN>> value)
+        * </code>
+        * </pre>
         *
         * @param function Function to extract the return type from
         * @param baseClass Base class of the function
-        * @param inputTypeArgumentIndex Index of the type argument of 
function's first parameter
-        *                               specifying the input type if it is 
wrapped (Iterable, Map,
-        *                               etc.). Otherwise -1.
-        * @param outputTypeArgumentIndex Index of the type argument of 
function's second parameter
-        *                                specifying the output type if it is 
wrapped in a Collector.
-        *                                Otherwise -1.
+        * @param inputTypeArgumentIndex Index of input type in the class 
specification
+        * @param outputTypeArgumentIndex Index of output type in the class 
specification
+        * @param lambdaInputTypeArgumentIndices Table of indices of the type 
argument specifying the input type. See example.
+        * @param lambdaOutputTypeArgumentIndices Table of indices of the type 
argument specifying the input type. See example.
         * @param inType Type of the input elements (In case of an iterable, it 
is the element type)
         * @param functionName Function name
         * @param allowMissing Can the type information be missing
@@ -411,6 +550,8 @@ public class TypeExtractor {
                Class<?> baseClass,
                int inputTypeArgumentIndex,
                int outputTypeArgumentIndex,
+               int[] lambdaInputTypeArgumentIndices,
+               int[] lambdaOutputTypeArgumentIndices,
                TypeInformation<IN> inType,
                String functionName,
                boolean allowMissing) {
@@ -422,37 +563,63 @@ public class TypeExtractor {
                                throw new InvalidTypesException("Internal error 
occurred.", e);
                        }
                        if (exec != null) {
+                               Preconditions.checkArgument(
+                                       lambdaInputTypeArgumentIndices != null 
&& lambdaInputTypeArgumentIndices.length >= 1,
+                                       "Indices for input type arguments 
within lambda not provided");
+                               Preconditions.checkArgument(
+                                       lambdaOutputTypeArgumentIndices != null,
+                                       "Indices for output type arguments 
within lambda not provided");
                                // check for lambda type erasure
                                validateLambdaGenericParameters(exec);
 
                                // parameters must be accessed from behind, 
since JVM can add additional parameters e.g. when using local variables inside 
lambda function
-                               final int paramLen = 
exec.getParameterTypes().length - 1;
+                               // paramLen is the total number of parameters 
of the provided lambda, it includes parameters added through closure
+                               final int paramLen = 
exec.getParameterTypes().length;
+
+                               final Method sam = 
TypeExtractionUtils.getSingleAbstractMethod(baseClass);
+
+                               // number of parameters the SAM of implemented 
interface has, the parameter indexing aplicates to this range
+                               final int baseParametersLen = 
sam.getParameterTypes().length;
 
                                // executable references "this" implicitly
-                               if (paramLen < 0) {
+                               if (paramLen <= 0) {
                                        // executable declaring class can also 
be a super class of the input type
                                        // we only validate if the executable 
exists in input type
                                        validateInputContainsExecutable(exec, 
inType);
                                }
                                else {
-                                       final Type input = 
(outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 1] : 
exec.getParameterTypes()[paramLen];
-                                       
validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, 
inputTypeArgumentIndex) : input, inType);
+                                       final Type input = 
TypeExtractionUtils.extractTypeFromLambda(
+                                               exec,
+                                               lambdaInputTypeArgumentIndices,
+                                               paramLen,
+                                               baseParametersLen);
+                                       validateInputType(input, inType);
                                }
 
                                if (function instanceof ResultTypeQueryable) {
                                        return ((ResultTypeQueryable<OUT>) 
function).getProducedType();
                                }
-                               return new 
TypeExtractor().privateCreateTypeInfo(
-                                       (outputTypeArgumentIndex >= 0) ? 
extractTypeArgument(exec.getParameterTypes()[paramLen], 
outputTypeArgumentIndex) : exec.getReturnType(),
-                                       inType,
-                                       null);
-                       }
-                       else {
-                               validateInputType(baseClass, 
function.getClass(), 0, inType);
+
+                               final Type output;
+                               if (lambdaOutputTypeArgumentIndices.length > 0) 
{
+                                       output = 
TypeExtractionUtils.extractTypeFromLambda(
+                                               exec,
+                                               lambdaOutputTypeArgumentIndices,
+                                               paramLen,
+                                               baseParametersLen);
+                               } else {
+                                       output = exec.getReturnType();
+                               }
+
+                               return new 
TypeExtractor().privateCreateTypeInfo(output, inType, null);
+                       } else {
+                               
Preconditions.checkArgument(inputTypeArgumentIndex >= 0, "Input type argument 
index was not provided");
+                               
Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument 
index was not provided");
+                               validateInputType(baseClass, 
function.getClass(), inputTypeArgumentIndex, inType);
                                if(function instanceof ResultTypeQueryable) {
                                        return ((ResultTypeQueryable<OUT>) 
function).getProducedType();
                                }
-                               return new 
TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, 
inType, null);
+                               return new 
TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 
outputTypeArgumentIndex, inType, null);
                        }
                }
                catch (InvalidTypesException e) {
@@ -467,54 +634,23 @@ public class TypeExtractor {
        /**
         * Returns the binary operator's return type.
         *
-        * @param function Function to extract the return type from
-        * @param baseClass Base class of the function
-        * @param hasIterables True if the first function parameter is an 
iterable, otherwise false
-        * @param hasCollector True if the function has an additional collector 
parameter, otherwise false
-        * @param in1Type Type of the left side input elements (In case of an 
iterable, it is the element type)
-        * @param in2Type Type of the right side input elements (In case of an 
iterable, it is the element type)
-        * @param functionName Function name
-        * @param allowMissing Can the type information be missing
-        * @param <IN1> Left side input type
-        * @param <IN2> Right side input type
-        * @param <OUT> Output type
-        * @return TypeInformation of the return type of the function
-        */
-       @SuppressWarnings("unchecked")
-       @PublicEvolving
-       public static <IN1, IN2, OUT> TypeInformation<OUT> 
getBinaryOperatorReturnType(
-               Function function,
-               Class<?> baseClass,
-               boolean hasIterables,
-               boolean hasCollector,
-               TypeInformation<IN1> in1Type,
-               TypeInformation<IN2> in2Type,
-               String functionName,
-               boolean allowMissing) {
-
-               return getBinaryOperatorReturnType(
-                       function,
-                       baseClass,
-                       hasIterables ? 0 : -1,
-                       hasCollector ? 0 : -1,
-                       in1Type,
-                       in2Type,
-                       functionName,
-                       allowMissing
-               );
-       }
-
-       /**
-        * Returns the binary operator's return type.
+        * <p><b>NOTE:</b> lambda type indices allows extraction of Type from 
lambdas. To extract input type <b>IN1</b>
+        * from the function given below one should pass {@code new int[] 
{0,1,0}} as lambdaInput1TypeArgumentIndices.
+        *
+        * <pre>
+        * <code>
+        * OUT apply(Map<String, List<IN1>> value1, List<IN2> value2)
+        * </code>
+        * </pre>
         *
         * @param function Function to extract the return type from
         * @param baseClass Base class of the function
-        * @param inputTypeArgumentIndex Index of the type argument of 
function's first parameter
-        *                               specifying the input type if it is 
wrapped (Iterable, Map,
-        *                               etc.). Otherwise -1.
-        * @param outputTypeArgumentIndex Index of the type argument of 
functions second parameter
-        *                                specifying the output type if it is 
wrapped in a Collector.
-        *                                Otherwise -1.
+        * @param input1TypeArgumentIndex Index of first input type in the 
class specification
+        * @param input2TypeArgumentIndex Index of second input type in the 
class specification
+        * @param outputTypeArgumentIndex Index of output type in the class 
specification
+        * @param lambdaInput1TypeArgumentIndices Table of indices of the type 
argument specifying the first input type. See example.
+        * @param lambdaInput2TypeArgumentIndices Table of indices of the type 
argument specifying the second input type. See example.
+        * @param lambdaOutputTypeArgumentIndices Table of indices of the type 
argument specifying the input type. See example.
         * @param in1Type Type of the left side input elements (In case of an 
iterable, it is the element type)
         * @param in2Type Type of the right side input elements (In case of an 
iterable, it is the element type)
         * @param functionName Function name
@@ -529,8 +665,12 @@ public class TypeExtractor {
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getBinaryOperatorReturnType(
                Function function,
                Class<?> baseClass,
-               int inputTypeArgumentIndex,
+               int input1TypeArgumentIndex,
+               int input2TypeArgumentIndex,
                int outputTypeArgumentIndex,
+               int[] lambdaInput1TypeArgumentIndices,
+               int[] lambdaInput2TypeArgumentIndices,
+               int[] lambdaOutputTypeArgumentIndices,
                TypeInformation<IN1> in1Type,
                TypeInformation<IN2> in2Type,
                String functionName,
@@ -543,30 +683,67 @@ public class TypeExtractor {
                                throw new InvalidTypesException("Internal error 
occurred.", e);
                        }
                        if (exec != null) {
+                               Preconditions.checkArgument(
+                                       lambdaInput1TypeArgumentIndices != null 
&& lambdaInput1TypeArgumentIndices.length >= 1,
+                                       "Indices for first input type arguments 
within lambda not provided");
+                               Preconditions.checkArgument(
+                                       lambdaInput2TypeArgumentIndices != null 
&& lambdaInput1TypeArgumentIndices.length >= 1,
+                                       "Indices for second input type 
arguments within lambda not provided");
+                               Preconditions.checkArgument(
+                                       lambdaOutputTypeArgumentIndices != null,
+                                       "Indices for output type arguments 
within lambda not provided");
                                // check for lambda type erasure
                                validateLambdaGenericParameters(exec);
-                               
+
+                               final Method sam = 
TypeExtractionUtils.getSingleAbstractMethod(baseClass);
+                               final int baseParametersLen = 
sam.getParameterTypes().length;
+
                                // parameters must be accessed from behind, 
since JVM can add additional parameters e.g. when using local variables inside 
lambda function
-                               final int paramLen = 
exec.getParameterTypes().length - 1;
-                               final Type input1 = (outputTypeArgumentIndex >= 
0) ? exec.getParameterTypes()[paramLen - 2] : exec.getParameterTypes()[paramLen 
- 1];
-                               final Type input2 = (outputTypeArgumentIndex >= 
0 ) ? exec.getParameterTypes()[paramLen - 1] : 
exec.getParameterTypes()[paramLen];
-                               validateInputType((inputTypeArgumentIndex >= 0) 
? extractTypeArgument(input1, inputTypeArgumentIndex) : input1, in1Type);
-                               validateInputType((inputTypeArgumentIndex >= 0) 
? extractTypeArgument(input2, inputTypeArgumentIndex) : input2, in2Type);
+                               final int paramLen = 
exec.getParameterTypes().length;
+
+                               final Type input1 = 
TypeExtractionUtils.extractTypeFromLambda(
+                                       exec,
+                                       lambdaInput1TypeArgumentIndices,
+                                       paramLen,
+                                       baseParametersLen);
+                               final Type input2 = 
TypeExtractionUtils.extractTypeFromLambda(
+                                       exec,
+                                       lambdaInput2TypeArgumentIndices,
+                                       paramLen,
+                                       baseParametersLen);
+
+                               validateInputType(input1, in1Type);
+                               validateInputType(input2, in2Type);
                                if(function instanceof ResultTypeQueryable) {
                                        return ((ResultTypeQueryable<OUT>) 
function).getProducedType();
                                }
+
+                               final Type output;
+                               if (lambdaOutputTypeArgumentIndices.length > 0) 
{
+                                       output = 
TypeExtractionUtils.extractTypeFromLambda(
+                                               exec,
+                                               lambdaOutputTypeArgumentIndices,
+                                               paramLen,
+                                               baseParametersLen);
+                               } else {
+                                       output = exec.getReturnType();
+                               }
+
                                return new 
TypeExtractor().privateCreateTypeInfo(
-                                       (outputTypeArgumentIndex >= 0) ? 
extractTypeArgument(exec.getParameterTypes()[paramLen], 
outputTypeArgumentIndex) : exec.getReturnType(),
+                                       output,
                                        in1Type,
                                        in2Type);
                        }
                        else {
-                               validateInputType(baseClass, 
function.getClass(), 0, in1Type);
-                               validateInputType(baseClass, 
function.getClass(), 1, in2Type);
+                               
Preconditions.checkArgument(input1TypeArgumentIndex >= 0, "Input 1 type 
argument index was not provided");
+                               
Preconditions.checkArgument(input2TypeArgumentIndex >= 0, "Input 2 type 
argument index was not provided");
+                               
Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument 
index was not provided");
+                               validateInputType(baseClass, 
function.getClass(), input1TypeArgumentIndex, in1Type);
+                               validateInputType(baseClass, 
function.getClass(), input2TypeArgumentIndex, in2Type);
                                if(function instanceof ResultTypeQueryable) {
                                        return ((ResultTypeQueryable<OUT>) 
function).getProducedType();
                                }
-                               return new 
TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, 
in1Type, in2Type);
+                               return new 
TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 
outputTypeArgumentIndex, in1Type, in2Type);
                        }
                }
                catch (InvalidTypesException e) {
@@ -577,7 +754,7 @@ public class TypeExtractor {
                        }
                }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Create type information
        // 
--------------------------------------------------------------------------------------------
@@ -586,7 +763,7 @@ public class TypeExtractor {
        public static <T> TypeInformation<T> createTypeInfo(Class<T> type) {
                return (TypeInformation<T>) createTypeInfo((Type) type);
        }
-       
+
        public static TypeInformation<?> createTypeInfo(Type t) {
                TypeInformation<?> ti = new 
TypeExtractor().privateCreateTypeInfo(t);
                if (ti == null) {
@@ -628,46 +805,46 @@ public class TypeExtractor {
                }
                return ti;
        }
-       
+
        // ----------------------------------- private methods 
----------------------------------------
-       
+
        private TypeInformation<?> privateCreateTypeInfo(Type t) {
                ArrayList<Type> typeHierarchy = new ArrayList<Type>();
                typeHierarchy.add(t);
                return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, 
null);
        }
-       
+
        // for (Rich)Functions
        @SuppressWarnings("unchecked")
        private <IN1, IN2, OUT> TypeInformation<OUT> 
privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type) {
                ArrayList<Type> typeHierarchy = new ArrayList<Type>();
                Type returnType = getParameterType(baseClass, typeHierarchy, 
clazz, returnParamPos);
-               
+
                TypeInformation<OUT> typeInfo;
-               
+
                // return type is a variable -> try to get the type info from 
the input directly
                if (returnType instanceof TypeVariable<?>) {
                        typeInfo = (TypeInformation<OUT>) 
createTypeInfoFromInputs((TypeVariable<?>) returnType, typeHierarchy, in1Type, 
in2Type);
-                       
+
                        if (typeInfo != null) {
                                return typeInfo;
                        }
                }
-               
+
                // get info from hierarchy
                return (TypeInformation<OUT>) 
createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
        }
-       
+
        // for LambdaFunctions
        @SuppressWarnings("unchecked")
        private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type 
returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
                ArrayList<Type> typeHierarchy = new ArrayList<Type>();
-               
+
                // get info from hierarchy
                return (TypeInformation<OUT>) 
createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
        }
-       
+
        @SuppressWarnings({ "unchecked", "rawtypes" })
        private <IN1, IN2, OUT> TypeInformation<OUT> 
createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type) {
@@ -680,29 +857,29 @@ public class TypeExtractor {
                // check if type is a subclass of tuple
                else if (isClassType(t) && 
Tuple.class.isAssignableFrom(typeToClass(t))) {
                        Type curT = t;
-                       
+
                        // do not allow usage of Tuple as type
                        if (typeToClass(t).equals(Tuple.class)) {
                                throw new InvalidTypesException(
                                                "Usage of class Tuple as a type 
is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.");
                        }
-                                               
+
                        // go up the hierarchy until we reach immediate child 
of Tuple (with or without generics)
-                       // collect the types while moving up for a later 
top-down 
+                       // collect the types while moving up for a later 
top-down
                        while (!(isClassType(curT) && 
typeToClass(curT).getSuperclass().equals(Tuple.class))) {
                                typeHierarchy.add(curT);
                                curT = typeToClass(curT).getGenericSuperclass();
                        }
-                       
+
                        if(curT == Tuple0.class) {
                                return new TupleTypeInfo(Tuple0.class);
                        }
-                       
+
                        // check if immediate child of Tuple has generics
                        if (curT instanceof Class<?>) {
                                throw new InvalidTypesException("Tuple needs to 
be parameterized by using generics.");
                        }
-                       
+
                        typeHierarchy.add(curT);
 
                        // create the type information for the subtypes
@@ -718,13 +895,13 @@ public class TypeExtractor {
                        }
                        // return tuple info
                        return new TupleTypeInfo(typeToClass(t), subTypesInfo);
-                       
+
                }
                // type depends on another type
                // e.g. class MyMapper<E> extends MapFunction<String, E>
                else if (t instanceof TypeVariable) {
                        Type typeVar = materializeTypeVariable(typeHierarchy, 
(TypeVariable<?>) t);
-                       
+
                        if (!(typeVar instanceof TypeVariable)) {
                                return 
createTypeInfoWithTypeHierarchy(typeHierarchy, typeVar, in1Type, in2Type);
                        }
@@ -741,12 +918,12 @@ public class TypeExtractor {
                                }
                        }
                }
-               // arrays with generics 
+               // arrays with generics
                else if (t instanceof GenericArrayType) {
                        GenericArrayType genericArray = (GenericArrayType) t;
-                       
+
                        Type componentType = 
genericArray.getGenericComponentType();
-                       
+
                        // due to a Java 6 bug, it is possible that the JVM 
classifies e.g. String[] or int[] as GenericArrayType instead of Class
                        if (componentType instanceof Class) {
                                Class<?> componentClass = (Class<?>) 
componentType;
@@ -775,11 +952,11 @@ public class TypeExtractor {
                else if (t instanceof Class) {
                        return privateGetForClass((Class<OUT>) t, 
typeHierarchy);
                }
-               
+
                throw new InvalidTypesException("Type Information could not be 
created.");
        }
 
-       private <IN1, IN2> TypeInformation<?> 
createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> 
returnTypeHierarchy, 
+       private <IN1, IN2> TypeInformation<?> 
createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> 
returnTypeHierarchy,
                        TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> 
in2TypeInfo) {
 
                Type matReturnTypeVar = 
materializeTypeVariable(returnTypeHierarchy, returnTypeVar);
@@ -791,12 +968,12 @@ public class TypeExtractor {
                else {
                        returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
                }
-               
+
                // no input information exists
                if (in1TypeInfo == null && in2TypeInfo == null) {
                        return null;
                }
-               
+
                // create a new type hierarchy for the input
                ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
                // copy the function part of the type hierarchy
@@ -809,7 +986,7 @@ public class TypeExtractor {
                        }
                }
                ParameterizedType baseClass = (ParameterizedType) 
inputTypeHierarchy.get(inputTypeHierarchy.size() - 1);
-               
+
                TypeInformation<?> info = null;
                if (in1TypeInfo != null) {
                        // find the deepest type variable that describes the 
type of input 1
@@ -898,18 +1075,18 @@ public class TypeExtractor {
                // the input is a tuple
                else if (inTypeInfo instanceof TupleTypeInfo && 
isClassType(inType) && Tuple.class.isAssignableFrom(typeToClass(inType))) {
                        ParameterizedType tupleBaseClass;
-                       
+
                        // get tuple from possible tuple subclass
                        while (!(isClassType(inType) && 
typeToClass(inType).getSuperclass().equals(Tuple.class))) {
                                inputTypeHierarchy.add(inType);
                                inType = 
typeToClass(inType).getGenericSuperclass();
                        }
                        inputTypeHierarchy.add(inType);
-                       
+
                        // we can assume to be parameterized since we
                        // already did input validation
                        tupleBaseClass = (ParameterizedType) inType;
-                       
+
                        Type[] tupleElements = 
tupleBaseClass.getActualTypeArguments();
                        // go thru all tuple elements and search for type 
variables
                        for (int i = 0; i < tupleElements.length; i++) {
@@ -1068,13 +1245,13 @@ public class TypeExtractor {
        public static Type getParameterType(Class<?> baseClass, Class<?> clazz, 
int pos) {
                return getParameterType(baseClass, null, clazz, pos);
        }
-       
+
        private static Type getParameterType(Class<?> baseClass, 
ArrayList<Type> typeHierarchy, Class<?> clazz, int pos) {
                if (typeHierarchy != null) {
                        typeHierarchy.add(clazz);
                }
                Type[] interfaceTypes = clazz.getGenericInterfaces();
-               
+
                // search in interfaces for base class
                for (Type t : interfaceTypes) {
                        Type parameter = 
getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
@@ -1082,18 +1259,18 @@ public class TypeExtractor {
                                return parameter;
                        }
                }
-               
+
                // search in superclass for base class
                Type t = clazz.getGenericSuperclass();
                Type parameter = getParameterTypeFromGenericType(baseClass, 
typeHierarchy, t, pos);
                if (parameter != null) {
                        return parameter;
                }
-               
-               throw new InvalidTypesException("The types of the interface " + 
baseClass.getName() + " could not be inferred. " + 
+
+               throw new InvalidTypesException("The types of the interface " + 
baseClass.getName() + " could not be inferred. " +
                                                "Support for synthetic 
interfaces, lambdas, and generic or raw types is limited at this point");
        }
-       
+
        private static Type getParameterTypeFromGenericType(Class<?> baseClass, 
ArrayList<Type> typeHierarchy, Type t, int pos) {
                // base class
                if (t instanceof ParameterizedType && 
baseClass.equals(((ParameterizedType) t).getRawType())) {
@@ -1109,7 +1286,7 @@ public class TypeExtractor {
                                typeHierarchy.add(t);
                        }
                        return getParameterType(baseClass, typeHierarchy, 
(Class<?>) ((ParameterizedType) t).getRawType(), pos);
-               }                       
+               }
                else if (t instanceof Class<?> && 
baseClass.isAssignableFrom((Class<?>) t)) {
                        if (typeHierarchy != null) {
                                typeHierarchy.add(t);
@@ -1118,11 +1295,11 @@ public class TypeExtractor {
                }
                return null;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Validate input
        // 
--------------------------------------------------------------------------------------------
-       
+
        private static void validateInputType(Type t, TypeInformation<?> 
inType) {
                ArrayList<Type> typeHierarchy = new ArrayList<Type>();
                try {
@@ -1132,7 +1309,7 @@ public class TypeExtractor {
                        throw new InvalidTypesException("Input mismatch: " + 
e.getMessage(), e);
                }
        }
-       
+
        private static void validateInputType(Class<?> baseClass, Class<?> 
clazz, int inputParamPos, TypeInformation<?> inTypeInfo) {
                ArrayList<Type> typeHierarchy = new ArrayList<Type>();
 
@@ -1152,21 +1329,21 @@ public class TypeExtractor {
                        throw new InvalidTypesException("Input mismatch: " + 
e.getMessage(), e);
                }
        }
-       
+
        @SuppressWarnings("unchecked")
        private static void validateInfo(ArrayList<Type> typeHierarchy, Type 
type, TypeInformation<?> typeInfo) {
                if (type == null) {
                        throw new InvalidTypesException("Unknown Error. Type is 
null.");
                }
-               
+
                if (typeInfo == null) {
                        throw new InvalidTypesException("Unknown Error. 
TypeInformation is null.");
                }
-               
+
                if (!(type instanceof TypeVariable<?>)) {
                        // check for Java Basic Types
                        if (typeInfo instanceof BasicTypeInfo) {
-                               
+
                                TypeInformation<?> actual;
                                // check if basic type at all
                                if (!(type instanceof Class<?>) || (actual = 
BasicTypeInfo.getInfoFor((Class<?>) type)) == null) {
@@ -1176,7 +1353,7 @@ public class TypeExtractor {
                                if (!typeInfo.equals(actual)) {
                                        throw new InvalidTypesException("Basic 
type '" + typeInfo + "' expected but was '" + actual + "'.");
                                }
-                               
+
                        }
                        // check for Java SQL time types
                        else if (typeInfo instanceof SqlTimeTypeInfo) {
@@ -1198,36 +1375,36 @@ public class TypeExtractor {
                                if (!(isClassType(type) && 
Tuple.class.isAssignableFrom(typeToClass(type)))) {
                                        throw new InvalidTypesException("Tuple 
type expected.");
                                }
-                               
+
                                // do not allow usage of Tuple as type
                                if (isClassType(type) && 
typeToClass(type).equals(Tuple.class)) {
                                        throw new 
InvalidTypesException("Concrete subclass of Tuple expected.");
                                }
-                               
+
                                // go up the hierarchy until we reach immediate 
child of Tuple (with or without generics)
                                while (!(isClassType(type) && 
typeToClass(type).getSuperclass().equals(Tuple.class))) {
                                        typeHierarchy.add(type);
                                        type = 
typeToClass(type).getGenericSuperclass();
                                }
-                               
+
                                if(type == Tuple0.class) {
                                        return;
                                }
-                               
+
                                // check if immediate child of Tuple has 
generics
                                if (type instanceof Class<?>) {
                                        throw new 
InvalidTypesException("Parameterized Tuple type expected.");
                                }
-                               
+
                                TupleTypeInfo<?> tti = (TupleTypeInfo<?>) 
typeInfo;
-                               
+
                                Type[] subTypes = ((ParameterizedType) 
type).getActualTypeArguments();
-                               
+
                                if (subTypes.length != tti.getArity()) {
                                        throw new InvalidTypesException("Tuple 
arity '" + tti.getArity() + "' expected but was '"
                                                        + subTypes.length + 
"'.");
                                }
-                               
+
                                for (int i = 0; i < subTypes.length; i++) {
                                        validateInfo(new 
ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
                                }
@@ -1258,16 +1435,16 @@ public class TypeExtractor {
                                                && !(type instanceof 
GenericArrayType && (component = ((GenericArrayType) 
type).getGenericComponentType()) != null)) {
                                        throw new InvalidTypesException("Array 
type expected.");
                                }
-                               
+
                                if (component instanceof TypeVariable<?>) {
                                        component = 
materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component);
                                        if (component instanceof TypeVariable) {
                                                return;
                                        }
                                }
-                               
+
                                validateInfo(typeHierarchy, component, 
((BasicArrayTypeInfo<?, ?>) typeInfo).getComponentInfo());
-                               
+
                        }
                        // check for object array
                        else if (typeInfo instanceof ObjectArrayTypeInfo<?, ?>) 
{
@@ -1275,7 +1452,7 @@ public class TypeExtractor {
                                if (!(type instanceof Class<?> && ((Class<?>) 
type).isArray()) && !(type instanceof GenericArrayType)) {
                                        throw new InvalidTypesException("Object 
array type expected.");
                                }
-                               
+
                                // check component
                                Type component;
                                if (type instanceof Class<?>) {
@@ -1283,14 +1460,14 @@ public class TypeExtractor {
                                } else {
                                        component = ((GenericArrayType) 
type).getGenericComponentType();
                                }
-                               
+
                                if (component instanceof TypeVariable<?>) {
                                        component = 
materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component);
                                        if (component instanceof TypeVariable) {
                                                return;
                                        }
                                }
-                               
+
                                validateInfo(typeHierarchy, component, 
((ObjectArrayTypeInfo<?, ?>) typeInfo).getComponentInfo());
                        }
                        // check for value
@@ -1299,7 +1476,7 @@ public class TypeExtractor {
                                if (!(type instanceof Class<?> && 
Value.class.isAssignableFrom((Class<?>) type))) {
                                        throw new InvalidTypesException("Value 
type expected.");
                                }
-                               
+
                                TypeInformation<?> actual;
                                // check value type contents
                                if (!((ValueTypeInfo<?>) 
typeInfo).equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends 
Value>) type))) {
@@ -1456,33 +1633,6 @@ public class TypeExtractor {
                return fieldCount;
        }
 
-       /**
-        * * This method extracts the n-th type argument from the given type. 
An InvalidTypesException
-        * is thrown if the type does not have any type arguments or if the 
index exceeds the number
-        * of type arguments.
-        *
-        * @param t Type to extract the type arguments from
-        * @param index Index of the type argument to extract
-        * @return The extracted type argument
-        * @throws InvalidTypesException if the given type does not have any 
type arguments or if the
-        * index exceeds the number of type arguments.
-        */
-       private static Type extractTypeArgument(Type t, int index) throws 
InvalidTypesException {
-               if(t instanceof ParameterizedType) {
-                       Type[] actualTypeArguments = ((ParameterizedType) 
t).getActualTypeArguments();
-
-                       if (index < 0 || index >= actualTypeArguments.length) {
-                               throw new InvalidTypesException("Cannot extract 
the type argument with index " +
-                                       index + " because the type has only " + 
actualTypeArguments.length +
-                                       " type arguments.");
-                       } else {
-                               return actualTypeArguments[index];
-                       }
-               } else {
-                       throw new InvalidTypesException("The given type " + t + 
" is not a parameterized type.");
-               }
-       }
-       
        private static void validateLambdaGenericParameters(LambdaExecutable 
exec) {
                // check the arguments
                for (Type t : exec.getParameterTypes()) {
@@ -1516,19 +1666,19 @@ public class TypeExtractor {
                // iterate thru hierarchy from top to bottom until type 
variable gets a class assigned
                for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
                        Type curT = typeHierarchy.get(i);
-                       
+
                        // parameterized type
                        if (curT instanceof ParameterizedType) {
                                Class<?> rawType = ((Class<?>) 
((ParameterizedType) curT).getRawType());
-                               
+
                                for (int paramIndex = 0; paramIndex < 
rawType.getTypeParameters().length; paramIndex++) {
-                                       
+
                                        TypeVariable<?> curVarOfCurT = 
rawType.getTypeParameters()[paramIndex];
-                                       
+
                                        // check if variable names match
                                        if (sameTypeVars(curVarOfCurT, 
inTypeTypeVar)) {
                                                Type curVarType = 
((ParameterizedType) curT).getActualTypeArguments()[paramIndex];
-                                               
+
                                                // another type variable level
                                                if (curVarType instanceof 
TypeVariable<?>) {
                                                        inTypeTypeVar = 
(TypeVariable<?>) curVarType;
@@ -1545,14 +1695,14 @@ public class TypeExtractor {
                // return the type variable of the deepest level
                return inTypeTypeVar;
        }
-       
+
        /**
         * Creates type information from a given Class such as Integer, 
String[] or POJOs.
-        * 
-        * This method does not support ParameterizedTypes such as Tuples or 
complex type hierarchies. 
+        *
+        * This method does not support ParameterizedTypes such as Tuples or 
complex type hierarchies.
         * In most cases {@link TypeExtractor#createTypeInfo(Type)} is the 
recommended method for type extraction
         * (a Class is a child of Type).
-        * 
+        *
         * @param clazz a Class to create TypeInformation for
         * @return TypeInformation that describes the passed Class
         */
@@ -1561,7 +1711,7 @@ public class TypeExtractor {
                typeHierarchy.add(clazz);
                return new TypeExtractor().privateGetForClass(clazz, 
typeHierarchy);
        }
-       
+
        private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, 
ArrayList<Type> typeHierarchy) {
                return privateGetForClass(clazz, typeHierarchy, null, null, 
null);
        }
@@ -1600,13 +1750,13 @@ public class TypeExtractor {
                        if (primitiveArrayInfo != null) {
                                return primitiveArrayInfo;
                        }
-                       
+
                        // basic type arrays: String[], Integer[], Double[]
                        BasicArrayTypeInfo<OUT, ?> basicArrayInfo = 
BasicArrayTypeInfo.getInfoFor(clazz);
                        if (basicArrayInfo != null) {
                                return basicArrayInfo;
                        }
-                       
+
                        // object arrays
                        else {
                                TypeInformation<?> componentTypeInfo = 
createTypeInfoWithTypeHierarchy(
@@ -1618,7 +1768,7 @@ public class TypeExtractor {
                                return ObjectArrayTypeInfo.getInfoFor(clazz, 
componentTypeInfo);
                        }
                }
-               
+
                // check for writable types
                if (isHadoopWritable(clazz)) {
                        return createHadoopWritableTypeInfo(clazz);
@@ -1635,13 +1785,13 @@ public class TypeExtractor {
                if (timeTypeInfo != null) {
                        return timeTypeInfo;
                }
-               
+
                // check for subclasses of Value
                if (Value.class.isAssignableFrom(clazz)) {
                        Class<? extends Value> valueClass = 
clazz.asSubclass(Value.class);
                        return (TypeInformation<OUT>) 
ValueTypeInfo.getValueTypeInfo(valueClass);
                }
-               
+
                // check for subclasses of Tuple
                if (Tuple.class.isAssignableFrom(clazz)) {
                        if(clazz == Tuple0.class) {
@@ -1680,13 +1830,13 @@ public class TypeExtractor {
                // return a generic type
                return new GenericTypeInfo<OUT>(clazz);
        }
-       
+
        /**
         * Checks if the given field is a valid pojo field:
         * - it is public
         * OR
         *  - there are getter and setter methods for the field.
-        *  
+        *
         * @param f field to check
         * @param clazz class of field
         * @param typeHierarchy type hierarchy for materializing generic types
@@ -1753,7 +1903,7 @@ public class TypeExtractor {
                        LOG.info("Class " + clazz.getName() + " is not public, 
cannot treat it as a POJO type. Will be handled as GenericType");
                        return new GenericTypeInfo<OUT>(clazz);
                }
-               
+
                // add the hierarchy of the POJO itself if it is generic
                if (parameterizedType != null) {
                        getTypeHierarchy(typeHierarchy, parameterizedType, 
Object.class);
@@ -1762,7 +1912,7 @@ public class TypeExtractor {
                else if (typeHierarchy.size() <= 1) {
                        getTypeHierarchy(typeHierarchy, clazz, Object.class);
                }
-               
+
                List<Field> fields = getAllDeclaredFields(clazz, false);
                if (fields.size() == 0) {
                        LOG.info("No fields detected for " + clazz + ". Cannot 
be used as a PojoType. Will be handled as GenericType");
@@ -1822,7 +1972,7 @@ public class TypeExtractor {
                        LOG.info("The default constructor of " + clazz + " 
should be Public to be used as a POJO.");
                        return null;
                }
-               
+
                // everything is checked, we return the pojo
                return pojoType;
        }
@@ -1870,7 +2020,7 @@ public class TypeExtractor {
                }
                return null;
        }
-       
+
        private static boolean hasFieldWithSameName(String name, List<Field> 
fields) {
                for(Field field : fields) {
                        if(name.equals(field.getName())) {
@@ -1879,7 +2029,7 @@ public class TypeExtractor {
                }
                return false;
        }
-       
+
        private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> 
pojoInfo, Field field) {
                for (int j = 0; j < pojoInfo.getArity(); j++) {
                        PojoField pf = ((PojoTypeInfo<?>) 
pojoInfo).getPojoFieldAt(j);
@@ -1911,20 +2061,20 @@ public class TypeExtractor {
                        Tuple t = (Tuple) value;
                        int numFields = t.getArity();
                        if(numFields != countFieldsInClass(value.getClass())) {
-                               // not a tuple since it has more fields. 
+                               // not a tuple since it has more fields.
                                return analyzePojo((Class<X>) value.getClass(), 
new ArrayList<Type>(), null, null, null); // we immediately call analyze Pojo 
here, because
                                // there is currently no other type that can 
handle such a class.
                        }
-                       
+
                        TypeInformation<?>[] infos = new 
TypeInformation[numFields];
                        for (int i = 0; i < numFields; i++) {
                                Object field = t.getField(i);
-                               
+
                                if (field == null) {
                                        throw new 
InvalidTypesException("Automatic type extraction is not possible on candidates 
with null values. "
                                                        + "Please specify the 
types directly.");
                                }
-                               
+
                                infos[i] = privateGetForObject(field);
                        }
                        return new TupleTypeInfo(value.getClass(), infos);
@@ -2013,10 +2163,10 @@ public class TypeExtractor {
        static void validateIfWritable(TypeInformation<?> typeInfo, Type type) {
                try {
                        // try to load the writable type info
-                       
+
                        Class<?> writableTypeInfoClass = Class
                                        
.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, 
typeInfo.getClass().getClassLoader());
-                       
+
                        if 
(writableTypeInfoClass.isAssignableFrom(typeInfo.getClass())) {
                                // this is actually a writable type info
                                // check if the type is a writable

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
 
b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index 64ff605..7500d73 100644
--- 
a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ 
b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -261,6 +262,18 @@ public class LambdaExtractionTest {
                Assert.assertTrue(ti instanceof MissingTypeInfo);
        }
 
+       @Test
+       public void testPartitionerLambda() {
+               Partitioner<Tuple2<Integer, String>> partitioner = (key, 
numPartitions) -> key.f1.length() % numPartitions;
+               final TypeInformation<?> ti = 
TypeExtractor.getPartitionerTypes(partitioner);
+
+               Assert.assertTrue(ti.isTupleType());
+               Assert.assertEquals(2, ti.getArity());
+               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), 
BasicTypeInfo.INT_TYPE_INFO);
+               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+       }
+
        private static class MyType {
                private int key;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java 
b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
index 4eff037..37bf872 100644
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
@@ -53,7 +53,6 @@ public class CEPLambdaTest extends TestLogger {
         * Tests that a Java8 lambda can be passed as a CEP select function.
         */
        @Test
-       @Ignore
        public void testLambdaSelectFunction() {
                TypeInformation<EventA> eventTypeInformation = 
TypeExtractor.getForClass(EventA.class);
                TypeInformation<EventB> outputTypeInformation = 
TypeExtractor.getForClass(EventB.class);
@@ -81,7 +80,6 @@ public class CEPLambdaTest extends TestLogger {
         * Tests that a Java8 lambda can be passed as a CEP flat select 
function.
         */
        @Test
-       @Ignore
        public void testLambdaFlatSelectFunction() {
                TypeInformation<EventA> eventTypeInformation = 
TypeExtractor.getForClass(EventA.class);
                TypeInformation<EventB> outputTypeInformation = 
TypeExtractor.getForClass(EventB.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 71614cf..3131a94 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -83,8 +83,10 @@ public class PatternStream<T> {
                TypeInformation<R> returnType = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternSelectFunction,
                        PatternSelectFunction.class,
+                       0,
                        1,
-                       -1,
+                       new int[]{0, 1, 0},
+                       new int[]{},
                        inputStream.getType(),
                        null,
                        false);
@@ -142,8 +144,10 @@ public class PatternStream<T> {
                TypeInformation<L> leftTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternTimeoutFunction,
                        PatternTimeoutFunction.class,
+                       0,
                        1,
-                       -1,
+                       new int[]{0, 1, 0},
+                       new int[]{},
                        inputStream.getType(),
                        null,
                        false);
@@ -151,8 +155,10 @@ public class PatternStream<T> {
                TypeInformation<R> rightTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternSelectFunction,
                        PatternSelectFunction.class,
+                       0,
                        1,
-                       -1,
+                       new int[]{0, 1, 0},
+                       new int[]{},
                        inputStream.getType(),
                        null,
                        false);
@@ -184,8 +190,10 @@ public class PatternStream<T> {
                TypeInformation<R> outTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternFlatSelectFunction,
                        PatternFlatSelectFunction.class,
-                       1,
                        0,
+                       1,
+                       new int[] {0, 1, 0},
+                       new int[] {1, 0},
                        inputStream.getType(),
                        null,
                        false);
@@ -244,8 +252,10 @@ public class PatternStream<T> {
                TypeInformation<L> leftTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternFlatTimeoutFunction,
                        PatternFlatTimeoutFunction.class,
+                       0,
                        1,
-                       -1,
+                       new int[]{0, 1, 0},
+                       new int[]{2, 0},
                        inputStream.getType(),
                        null,
                        false);
@@ -253,8 +263,10 @@ public class PatternStream<T> {
                TypeInformation<R> rightTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternFlatSelectFunction,
                        PatternFlatSelectFunction.class,
+                       0,
                        1,
-                       -1,
+                       new int[]{0, 1, 0},
+                       new int[]{1, 0},
                        inputStream.getType(),
                        null,
                        false);

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 69ba42f..950b5da 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -232,7 +232,7 @@ public class CEPMigration11to13Test {
                NullByteKeySelector keySelector = new NullByteKeySelector();
 
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
-                               new KeyedOneInputStreamOperatorTestHarness<>(
+                               new 
KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
                                                                false,
@@ -284,7 +284,7 @@ public class CEPMigration11to13Test {
                        OperatorStateHandles snapshot = harness.snapshot(1L, 
1L);
                        harness.close();
 
-                       harness = new KeyedOneInputStreamOperatorTestHarness<>(
+                       harness = new 
KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
                                new KeyedCEPPatternOperator<>(
                                        Event.createTypeSerializer(),
                                        false,

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index 9c4f88e..4cb4e01 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -73,7 +73,16 @@ public class Translate {
 
                Class<Vertex<NEW, VV>> vertexClass = (Class<Vertex<NEW, VV>>) 
(Class<? extends Vertex>) Vertex.class;
                TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<OLD, 
VV>>) vertices.getType()).getTypeAt(0);
-               TypeInformation<NEW> newType = 
TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, 
false, false, oldType, null, false);
+               TypeInformation<NEW> newType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       translator,
+                       TranslateFunction.class,
+                       0,
+                       1,
+                       new int[]{0},
+                       new int[]{1},
+                       oldType,
+                       null,
+                       false);
                TypeInformation<VV> vertexValueType = 
((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(1);
 
                TupleTypeInfo<Vertex<NEW, VV>> returnType = new 
TupleTypeInfo<>(vertexClass, newType, vertexValueType);
@@ -148,7 +157,16 @@ public class Translate {
 
                Class<Edge<NEW, EV>> edgeClass = (Class<Edge<NEW, EV>>) 
(Class<? extends Edge>) Edge.class;
                TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<OLD, EV>>) 
edges.getType()).getTypeAt(0);
-               TypeInformation<NEW> newType = 
TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, 
false, false, oldType, null, false);
+               TypeInformation<NEW> newType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       translator,
+                       TranslateFunction.class,
+                       0,
+                       1,
+                       new int[] {0},
+                       new int[] {1},
+                       oldType,
+                       null,
+                       false);
                TypeInformation<EV> edgeValueType = ((TupleTypeInfo<Edge<OLD, 
EV>>) edges.getType()).getTypeAt(2);
 
                TupleTypeInfo<Edge<NEW, EV>> returnType = new 
TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
@@ -225,7 +243,16 @@ public class Translate {
                Class<Vertex<K, NEW>> vertexClass = (Class<Vertex<K, NEW>>) 
(Class<? extends Vertex>) Vertex.class;
                TypeInformation<K> idType = ((TupleTypeInfo<Vertex<K, OLD>>) 
vertices.getType()).getTypeAt(0);
                TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<K, OLD>>) 
vertices.getType()).getTypeAt(1);
-               TypeInformation<NEW> newType = 
TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, 
false, false, oldType, null, false);
+               TypeInformation<NEW> newType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       translator,
+                       TranslateFunction.class,
+                       0,
+                       1,
+                       new int[]{0},
+                       new int[]{1},
+                       oldType,
+                       null,
+                       false);
 
                TupleTypeInfo<Vertex<K, NEW>> returnType = new 
TupleTypeInfo<>(vertexClass, idType, newType);
 
@@ -300,7 +327,16 @@ public class Translate {
                Class<Edge<K, NEW>> edgeClass = (Class<Edge<K, NEW>>) (Class<? 
extends Edge>) Edge.class;
                TypeInformation<K> idType = ((TupleTypeInfo<Edge<K, OLD>>) 
edges.getType()).getTypeAt(0);
                TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<K, OLD>>) 
edges.getType()).getTypeAt(2);
-               TypeInformation<NEW> newType = 
TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, 
false, false, oldType, null, false);
+               TypeInformation<NEW> newType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       translator,
+                       TranslateFunction.class,
+                       0,
+                       1,
+                       new int[]{0},
+                       new int[]{1},
+                       oldType,
+                       null,
+                       false);
 
                TupleTypeInfo<Edge<K, NEW>> returnType = new 
TupleTypeInfo<>(edgeClass, idType, idType, newType);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 31dbb4f..ae97109 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -236,8 +236,7 @@ public class AllWindowedStream<T, W extends Window> {
                        AllWindowFunction<T, R, W> function) {
 
                TypeInformation<T> inType = input.getType();
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                       function, AllWindowFunction.class, true, true, inType, 
null, false);
+               TypeInformation<R> resultType = 
getAllWindowFunctionReturnType(function, inType);
 
                return reduce(reduceFunction, function, resultType);
        }
@@ -332,8 +331,7 @@ public class AllWindowedStream<T, W extends Window> {
                        ReduceFunction<T> reduceFunction,
                        ProcessAllWindowFunction<T, R, W> function) {
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                       function, ProcessAllWindowFunction.class, true, true, 
input.getType(), null, false);
+               TypeInformation<R> resultType = 
getProcessAllWindowFunctionReturnType(function, input.getType());
 
                return reduce(reduceFunction, function, resultType);
        }
@@ -507,12 +505,41 @@ public class AllWindowedStream<T, W extends Window> {
                TypeInformation<V> aggResultType = 
TypeExtractor.getAggregateFunctionReturnType(
                                aggFunction, input.getType(), null, false);
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               windowFunction, AllWindowFunction.class, true, 
true, aggResultType, null, false);
+               TypeInformation<R> resultType = 
getAllWindowFunctionReturnType(windowFunction, aggResultType);
 
                return aggregate(aggFunction, windowFunction, accumulatorType, 
aggResultType, resultType);
        }
 
+       private static <IN, OUT> TypeInformation<OUT> 
getAllWindowFunctionReturnType(
+                       AllWindowFunction<IN, OUT, ?> function,
+                       TypeInformation<IN> inType) {
+               return TypeExtractor.getUnaryOperatorReturnType(
+                       function,
+                       AllWindowFunction.class,
+                       0,
+                       1,
+                       new int[]{1, 0},
+                       new int[]{2, 0},
+                       inType,
+                       null,
+                       false);
+       }
+
+       private static <IN, OUT> TypeInformation<OUT> 
getProcessAllWindowFunctionReturnType(
+                       ProcessAllWindowFunction<IN, OUT, ?> function,
+                       TypeInformation<IN> inType) {
+               return TypeExtractor.getUnaryOperatorReturnType(
+                       function,
+                       ProcessAllWindowFunction.class,
+                       0,
+                       1,
+                       new int[]{1, 0},
+                       new int[]{2, 0},
+                       inType,
+                       null,
+                       false);
+       }
+
        /**
         * Applies the given window function to each window. The window 
function is called for each
         * evaluation of the window for each key individually. The output of 
the window function is
@@ -642,8 +669,7 @@ public class AllWindowedStream<T, W extends Window> {
                TypeInformation<V> aggResultType = 
TypeExtractor.getAggregateFunctionReturnType(
                                aggFunction, input.getType(), null, false);
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               windowFunction, ProcessAllWindowFunction.class, 
true, true, aggResultType, null, false);
+               TypeInformation<R> resultType = 
getProcessAllWindowFunctionReturnType(windowFunction, aggResultType);
 
                return aggregate(aggFunction, windowFunction, accumulatorType, 
aggResultType, resultType);
        }
@@ -811,8 +837,7 @@ public class AllWindowedStream<T, W extends Window> {
                TypeInformation<ACC> foldAccumulatorType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
                        Utils.getCallLocationName(), true);
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                       function, AllWindowFunction.class, true, true, 
foldAccumulatorType, null, false);
+               TypeInformation<R> resultType = 
getAllWindowFunctionReturnType(function, foldAccumulatorType);
 
                return fold(initialValue, foldFunction, function, 
foldAccumulatorType, resultType);
        }
@@ -923,8 +948,7 @@ public class AllWindowedStream<T, W extends Window> {
                TypeInformation<ACC> foldAccumulatorType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
                        Utils.getCallLocationName(), true);
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                       function, ProcessAllWindowFunction.class, true, true, 
foldAccumulatorType, null, false);
+               TypeInformation<R> resultType = 
getProcessAllWindowFunctionReturnType(function, foldAccumulatorType);
 
                return fold(initialValue, foldFunction, function, 
foldAccumulatorType, resultType);
        }
@@ -1032,8 +1056,7 @@ public class AllWindowedStream<T, W extends Window> {
        public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, 
W> function) {
                String callLocation = Utils.getCallLocationName();
                function = input.getExecutionEnvironment().clean(function);
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, AllWindowFunction.class, true, true, 
getInputType(), null, false);
+               TypeInformation<R> resultType = 
getAllWindowFunctionReturnType(function, getInputType());
                return apply(new InternalIterableAllWindowFunction<>(function), 
resultType, callLocation);
        }
 
@@ -1069,8 +1092,7 @@ public class AllWindowedStream<T, W extends Window> {
        public <R> SingleOutputStreamOperator<R> 
process(ProcessAllWindowFunction<T, R, W> function) {
                String callLocation = Utils.getCallLocationName();
                function = input.getExecutionEnvironment().clean(function);
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, ProcessAllWindowFunction.class, true, 
true, getInputType(), null, false);
+               TypeInformation<R> resultType = 
getProcessAllWindowFunctionReturnType(function, getInputType());
                return apply(new 
InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
        }
 
@@ -1160,8 +1182,7 @@ public class AllWindowedStream<T, W extends Window> {
        @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> 
reduceFunction, AllWindowFunction<T, R, W> function) {
                TypeInformation<T> inType = input.getType();
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, AllWindowFunction.class, true, true, 
inType, null, false);
+               TypeInformation<R> resultType = 
getAllWindowFunctionReturnType(function, inType);
 
                return apply(reduceFunction, function, resultType);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
index 8461d2c..cb18a3f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
@@ -67,9 +67,16 @@ public class AsyncDataStream {
                        int bufSize,
                        OutputMode mode) {
 
-               TypeInformation<OUT> outTypeInfo =
-                       TypeExtractor.getUnaryOperatorReturnType(func, 
AsyncFunction.class, false,
-                               true, in.getType(), 
Utils.getCallLocationName(), true);
+               TypeInformation<OUT> outTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
+                       func,
+                       AsyncFunction.class,
+                       0,
+                       1,
+                       new int[]{0},
+                       new int[]{1, 0},
+                       in.getType(),
+                       Utils.getCallLocationName(),
+                       true);
 
                // create transform
                AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 8dad1cb..4bbb123 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -235,15 +235,12 @@ public class CoGroupedStreams<T1, T2> {
                 */
                public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> 
function) {
 
-                       TypeInformation<T> resultType = 
TypeExtractor.getBinaryOperatorReturnType(
-                                       function,
-                                       CoGroupFunction.class,
-                                       true,
-                                       true,
-                                       input1.getType(),
-                                       input2.getType(),
-                                       "CoGroup",
-                                       false);
+                       TypeInformation<T> resultType = 
TypeExtractor.getCoGroupReturnTypes(
+                               function,
+                               input1.getType(),
+                               input2.getType(),
+                               "CoGroup",
+                               false);
 
                        return apply(function, resultType);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 0b882c8..e244bd2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -203,9 +203,19 @@ public class ConnectedStreams<IN1, IN2> {
         */
        public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> 
coMapper) {
 
-               TypeInformation<R> outTypeInfo = 
TypeExtractor.getBinaryOperatorReturnType(coMapper,
-                               CoMapFunction.class, false, true, getType1(), 
getType2(),
-                               Utils.getCallLocationName(), true);
+               TypeInformation<R> outTypeInfo = 
TypeExtractor.getBinaryOperatorReturnType(
+                       coMapper,
+                       CoMapFunction.class,
+                       0,
+                       1,
+                       2,
+                       TypeExtractor.NO_INDEX,
+                       TypeExtractor.NO_INDEX,
+                       TypeExtractor.NO_INDEX,
+                       getType1(),
+                       getType2(),
+                       Utils.getCallLocationName(),
+                       true);
 
                return transform("Co-Map", outTypeInfo, new 
CoStreamMap<>(inputStream1.clean(coMapper)));
 
@@ -227,9 +237,19 @@ public class ConnectedStreams<IN1, IN2> {
        public <R> SingleOutputStreamOperator<R> flatMap(
                        CoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
 
-               TypeInformation<R> outTypeInfo = 
TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
-                               CoFlatMapFunction.class, false, true, 
getType1(), getType2(),
-                               Utils.getCallLocationName(), true);
+               TypeInformation<R> outTypeInfo = 
TypeExtractor.getBinaryOperatorReturnType(
+                       coFlatMapper,
+                       CoFlatMapFunction.class,
+                       0,
+                       1,
+                       2,
+                       TypeExtractor.NO_INDEX,
+                       TypeExtractor.NO_INDEX,
+                       TypeExtractor.NO_INDEX,
+                       getType1(),
+                       getType2(),
+                       Utils.getCallLocationName(),
+                       true);
 
                return transform("Co-Flat Map", outTypeInfo, new 
CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
        }
@@ -254,9 +274,19 @@ public class ConnectedStreams<IN1, IN2> {
        public <R> SingleOutputStreamOperator<R> process(
                        CoProcessFunction<IN1, IN2, R> coProcessFunction) {
 
-               TypeInformation<R> outTypeInfo = 
TypeExtractor.getBinaryOperatorReturnType(coProcessFunction,
-                               CoProcessFunction.class, false, true, 
getType1(), getType2(),
-                               Utils.getCallLocationName(), true);
+               TypeInformation<R> outTypeInfo = 
TypeExtractor.getBinaryOperatorReturnType(
+                       coProcessFunction,
+                       CoProcessFunction.class,
+                       0,
+                       1,
+                       2,
+                       TypeExtractor.NO_INDEX,
+                       TypeExtractor.NO_INDEX,
+                       TypeExtractor.NO_INDEX,
+                       getType1(),
+                       getType2(),
+                       Utils.getCallLocationName(),
+                       true);
 
                return process(coProcessFunction, outTypeInfo);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0cdc9a1..66cd8e6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -574,13 +574,15 @@ public class DataStream<T> {
        public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> 
processFunction) {
 
                TypeInformation<R> outType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               processFunction,
-                               ProcessFunction.class,
-                               false,
-                               true,
-                               getType(),
-                               Utils.getCallLocationName(),
-                               true);
+                       processFunction,
+                       ProcessFunction.class,
+                       0,
+                       1,
+                       new int[]{0},
+                       new int[]{2, 0},
+                       getType(),
+                       Utils.getCallLocationName(),
+                       true);
 
                return process(processFunction, outType);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index e1ffe86..f23ebcf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -221,14 +221,18 @@ public class JoinedStreams<T1, T2> {
                 */
                public <T> DataStream<T> apply(JoinFunction<T1, T2, T> 
function) {
                        TypeInformation<T> resultType = 
TypeExtractor.getBinaryOperatorReturnType(
-                                       function,
-                                       JoinFunction.class,
-                                       true,
-                                       true,
-                                       input1.getType(),
-                                       input2.getType(),
-                                       "Join",
-                                       false);
+                               function,
+                               JoinFunction.class,
+                               0,
+                               1,
+                               2,
+                               new int[]{0},
+                               new int[]{1},
+                               TypeExtractor.NO_INDEX,
+                               input1.getType(),
+                               input2.getType(),
+                               "Join",
+                               false);
 
                        return apply(function, resultType);
                }
@@ -300,14 +304,18 @@ public class JoinedStreams<T1, T2> {
                 */
                public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> 
function) {
                        TypeInformation<T> resultType = 
TypeExtractor.getBinaryOperatorReturnType(
-                                       function,
-                                       FlatJoinFunction.class,
-                                       true,
-                                       true,
-                                       input1.getType(),
-                                       input2.getType(),
-                                       "Join",
-                                       false);
+                               function,
+                               FlatJoinFunction.class,
+                               0,
+                               1,
+                               2,
+                               new int[]{0},
+                               new int[]{1},
+                               new int[]{2, 0},
+                               input1.getType(),
+                               input2.getType(),
+                               "Join",
+                               false);
 
                        return apply(function, resultType);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 698deb8..851e614 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -267,13 +267,15 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
        public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> 
processFunction) {
 
                TypeInformation<R> outType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               processFunction,
-                               ProcessFunction.class,
-                               false,
-                               true,
-                               getType(),
-                               Utils.getCallLocationName(),
-                               true);
+                       processFunction,
+                       ProcessFunction.class,
+                       0,
+                       1,
+                       new int[]{0},
+                       new int[]{2, 0},
+                       getType(),
+                       Utils.getCallLocationName(),
+                       true);
 
                return process(processFunction, outType);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index f8a1914..a795064 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -295,8 +295,7 @@ public class WindowedStream<T, K, W extends Window> {
                        LegacyWindowOperatorType legacyWindowOpType) {
 
                TypeInformation<T> inType = input.getType();
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                       function, WindowFunction.class, true, true, inType, 
null, false);
+               TypeInformation<R> resultType = 
getWindowFunctionReturnType(function, inType);
 
                return reduce(reduceFunction, function, resultType, 
legacyWindowOpType);
        }
@@ -396,8 +395,7 @@ public class WindowedStream<T, K, W extends Window> {
         */
        @PublicEvolving
        public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> 
reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, ProcessWindowFunction.class, true, 
true, input.getType(), null, false);
+               TypeInformation<R> resultType = 
getProcessWindowFunctionReturnType(function, input.getType(), null);
 
                return reduce(reduceFunction, function, resultType);
        }
@@ -544,8 +542,7 @@ public class WindowedStream<T, K, W extends Window> {
                TypeInformation<ACC> foldAccumulatorType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
                        Utils.getCallLocationName(), true);
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                       function, WindowFunction.class, true, true, 
foldAccumulatorType, null, false);
+               TypeInformation<R> resultType = 
getWindowFunctionReturnType(function, foldAccumulatorType);
 
                return fold(initialValue, foldFunction, function, 
foldAccumulatorType, resultType);
        }
@@ -663,8 +660,7 @@ public class WindowedStream<T, K, W extends Window> {
                TypeInformation<ACC> foldResultType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
                                Utils.getCallLocationName(), true);
 
-               TypeInformation<R> windowResultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               windowFunction, ProcessWindowFunction.class, 
true, true, foldResultType, Utils.getCallLocationName(), false);
+               TypeInformation<R> windowResultType = 
getProcessWindowFunctionReturnType(windowFunction, foldResultType, 
Utils.getCallLocationName());
 
                return fold(initialValue, foldFunction, windowFunction, 
foldResultType, windowResultType);
        }
@@ -852,8 +848,7 @@ public class WindowedStream<T, K, W extends Window> {
                TypeInformation<V> aggResultType = 
TypeExtractor.getAggregateFunctionReturnType(
                                aggFunction, input.getType(), null, false);
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               windowFunction, WindowFunction.class, true, 
true, aggResultType, null, false);
+               TypeInformation<R> resultType = 
getWindowFunctionReturnType(windowFunction, aggResultType);
 
                return aggregate(aggFunction, windowFunction, accumulatorType, 
aggResultType, resultType);
        }
@@ -981,12 +976,42 @@ public class WindowedStream<T, K, W extends Window> {
                TypeInformation<V> aggResultType = 
TypeExtractor.getAggregateFunctionReturnType(
                                aggFunction, input.getType(), null, false);
 
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               windowFunction, ProcessWindowFunction.class, 
true, true, aggResultType, null, false);
+               TypeInformation<R> resultType = 
getProcessWindowFunctionReturnType(windowFunction, aggResultType, null);
 
                return aggregate(aggFunction, windowFunction, accumulatorType, 
aggResultType, resultType);
        }
 
+       private static <IN, OUT, KEY> TypeInformation<OUT> 
getWindowFunctionReturnType(
+               WindowFunction<IN, OUT, KEY, ?> function,
+               TypeInformation<IN> inType) {
+               return TypeExtractor.getUnaryOperatorReturnType(
+                       function,
+                       WindowFunction.class,
+                       0,
+                       1,
+                       new int[]{2, 0},
+                       new int[]{3, 0},
+                       inType,
+                       null,
+                       false);
+       }
+
+       private static <IN, OUT, KEY> TypeInformation<OUT> 
getProcessWindowFunctionReturnType(
+                       ProcessWindowFunction<IN, OUT, KEY, ?> function,
+                       TypeInformation<IN> inType,
+                       String functionName) {
+               return TypeExtractor.getUnaryOperatorReturnType(
+                       function,
+                       ProcessWindowFunction.class,
+                       0,
+                       1,
+                       new int[]{2, 0},
+                       new int[]{3, 0},
+                       inType,
+                       functionName,
+                       false);
+       }
+
        /**
         * Applies the given window function to each window. The window 
function is called for each
         * evaluation of the window for each key individually. The output of 
the window function is
@@ -1094,8 +1119,7 @@ public class WindowedStream<T, K, W extends Window> {
         * @return The data stream that is the result of applying the window 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, 
W> function) {
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, WindowFunction.class, true, true, 
getInputType(), null, false);
+               TypeInformation<R> resultType = 
getWindowFunctionReturnType(function, getInputType());
 
                return apply(function, resultType);
        }
@@ -1131,8 +1155,7 @@ public class WindowedStream<T, K, W extends Window> {
         */
        @PublicEvolving
        public <R> SingleOutputStreamOperator<R> 
process(ProcessWindowFunction<T, R, K, W> function) {
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                       function, ProcessWindowFunction.class, true, true, 
getInputType(), null, false);
+               TypeInformation<R> resultType = 
getProcessWindowFunctionReturnType(function, getInputType(), null);
 
                return process(function, resultType);
        }
@@ -1231,8 +1254,7 @@ public class WindowedStream<T, K, W extends Window> {
        @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> 
reduceFunction, WindowFunction<T, R, K, W> function) {
                TypeInformation<T> inType = input.getType();
-               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
-                               function, WindowFunction.class, true, true, 
inType, null, false);
+               TypeInformation<R> resultType = 
getWindowFunctionReturnType(function, inType);
 
                return apply(reduceFunction, function, resultType);
        }

Reply via email to