[FLINK-6783] Changed passing index of type argument while extracting return type.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcaf816d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcaf816d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcaf816d Branch: refs/heads/master Commit: bcaf816dc5313c6c7de1e3436cc87036fd2ea3d0 Parents: 1cc1bb4 Author: Dawid Wysakowicz <[email protected]> Authored: Thu Jun 1 13:17:25 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Jun 8 10:42:41 2017 +0200 ---------------------------------------------------------------------- .../flink/api/common/functions/Partitioner.java | 6 +- .../api/java/typeutils/TypeExtractionUtils.java | 74 +++ .../flink/api/java/typeutils/TypeExtractor.java | 642 ++++++++++++------- .../java/type/lambdas/LambdaExtractionTest.java | 13 + .../org/apache/flink/cep/CEPLambdaTest.java | 2 - .../org/apache/flink/cep/PatternStream.java | 24 +- .../cep/operator/CEPMigration11to13Test.java | 4 +- .../flink/graph/asm/translate/Translate.java | 44 +- .../api/datastream/AllWindowedStream.java | 57 +- .../api/datastream/AsyncDataStream.java | 13 +- .../api/datastream/CoGroupedStreams.java | 15 +- .../api/datastream/ConnectedStreams.java | 48 +- .../streaming/api/datastream/DataStream.java | 16 +- .../streaming/api/datastream/JoinedStreams.java | 40 +- .../streaming/api/datastream/KeyedStream.java | 16 +- .../api/datastream/WindowedStream.java | 58 +- .../windowing/WindowTranslationTest.java | 232 +++---- 17 files changed, 846 insertions(+), 458 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java index 6c237ed..c272d3a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java @@ -22,15 +22,15 @@ import org.apache.flink.annotation.Public; /** * Function to implement a custom partition assignment for keys. - * + * * @param <K> The type of the key to be partitioned. */ @Public -public interface Partitioner<K> extends java.io.Serializable { +public interface Partitioner<K> extends java.io.Serializable, Function { /** * Computes the partition for the given key. - * + * * @param key The key. * @param numPartitions The number of partitions to partition into. * @return The partition index. http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java index 0aac257..c6ffd55 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils; import java.lang.reflect.Constructor; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; @@ -28,6 +29,8 @@ import java.util.Collections; import java.util.List; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.InvalidTypesException; + import static org.objectweb.asm.Type.getConstructorDescriptor; import static org.objectweb.asm.Type.getMethodDescriptor; @@ -161,6 +164,77 @@ public class TypeExtractionUtils { } /** + * Extracts type from given index from lambda. It supports nested types. + * + * @param exec lambda function to extract the type from + * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy + * @param paramLen count of total parameters of the lambda (including closure parameters) + * @param baseParametersLen count of lambda interface parameters (without closure parameters) + * @return extracted type + */ + public static Type extractTypeFromLambda( + LambdaExecutable exec, + int[] lambdaTypeArgumentIndices, + int paramLen, + int baseParametersLen) { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + } + + /** + * This method extracts the n-th type argument from the given type. An InvalidTypesException + * is thrown if the type does not have any type arguments or if the index exceeds the number + * of type arguments. + * + * @param t Type to extract the type arguments from + * @param index Index of the type argument to extract + * @return The extracted type argument + * @throws InvalidTypesException if the given type does not have any type arguments or if the + * index exceeds the number of type arguments. + */ + public static Type extractTypeArgument(Type t, int index) throws InvalidTypesException { + if (t instanceof ParameterizedType) { + Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments(); + + if (index < 0 || index >= actualTypeArguments.length) { + throw new InvalidTypesException("Cannot extract the type argument with index " + + index + " because the type has only " + actualTypeArguments.length + + " type arguments."); + } else { + return actualTypeArguments[index]; + } + } else { + throw new InvalidTypesException("The given type " + t + " is not a parameterized type."); + } + } + + /** + * Extracts a Single Abstract Method (SAM) as defined in Java Specification (4.3.2. The Class Object, + * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given class. + * + * @param baseClass + * @throws InvalidTypesException if the given class does not implement + * @return + */ + public static Method getSingleAbstractMethod(Class<?> baseClass) { + Method sam = null; + for (Method method : baseClass.getMethods()) { + if (Modifier.isAbstract(method.getModifiers())) { + if (sam == null) { + sam = method; + } else { + throw new InvalidTypesException( + "Given class: " + baseClass + " is not a FunctionalInterface. It does not have a SAM."); + } + } + } + return sam; + } + + /** * Returns all declared methods of a class including methods of superclasses. */ public static List<Method> getAllDeclaredMethods(Class<?> clazz) { http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 112ca38..c50dfc9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -116,6 +116,8 @@ public class TypeExtractor { private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class); + public static final int[] NO_INDEX = new int[] {}; + protected TypeExtractor() { // only create instances for special use cases } @@ -161,9 +163,18 @@ public class TypeExtractor { public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) mapInterface, MapFunction.class, false, false, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) mapInterface, + MapFunction.class, + 0, + 1, + new int[]{0}, + NO_INDEX, + inType, + functionName, + allowMissing); } - + @PublicEvolving public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) { @@ -174,7 +185,16 @@ public class TypeExtractor { public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) flatMapInterface, + FlatMapFunction.class, + 0, + 1, + new int[]{0}, + new int[]{1, 0}, + inType, + functionName, + allowMissing); } /** @@ -194,7 +214,16 @@ public class TypeExtractor { @Deprecated public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) foldInterface, + FoldFunction.class, + 0, + 1, + new int[]{1}, + NO_INDEX, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -205,7 +234,15 @@ public class TypeExtractor { boolean allowMissing) { return getUnaryOperatorReturnType( - function, AggregateFunction.class, 0, 1, inType, functionName, allowMissing); + function, + AggregateFunction.class, + 0, + 1, + new int[]{0}, + NO_INDEX, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -216,7 +253,15 @@ public class TypeExtractor { boolean allowMissing) { return getUnaryOperatorReturnType( - function, AggregateFunction.class, 0, 2, inType, functionName, allowMissing); + function, + AggregateFunction.class, + 0, + 2, + NO_INDEX, + NO_INDEX, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -228,7 +273,16 @@ public class TypeExtractor { public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) mapPartitionInterface, + MapPartitionFunction.class, + 0, + 1, + new int[]{0, 0}, + new int[]{1, 0}, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -240,7 +294,16 @@ public class TypeExtractor { public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) groupReduceInterface, + GroupReduceFunction.class, + 0, + 1, + new int[]{0, 0}, + new int[]{1, 0}, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -252,7 +315,16 @@ public class TypeExtractor { public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) combineInterface, GroupCombineFunction.class, true, true, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) combineInterface, + GroupCombineFunction.class, + 0, + 1, + new int[]{0, 0}, + new int[]{1, 0}, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -266,8 +338,19 @@ public class TypeExtractor { public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) { - return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true, - in1Type, in2Type, functionName, allowMissing); + return getBinaryOperatorReturnType( + (Function) joinInterface, + FlatJoinFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + new int[]{2, 0}, + in1Type, + in2Type, + functionName, + allowMissing); } @PublicEvolving @@ -281,8 +364,19 @@ public class TypeExtractor { public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) { - return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false, - in1Type, in2Type, functionName, allowMissing); + return getBinaryOperatorReturnType( + (Function) joinInterface, + JoinFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + NO_INDEX, + in1Type, + in2Type, + functionName, + allowMissing); } @PublicEvolving @@ -296,8 +390,19 @@ public class TypeExtractor { public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) { - return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true, - in1Type, in2Type, functionName, allowMissing); + return getBinaryOperatorReturnType( + (Function) coGroupInterface, + CoGroupFunction.class, + 0, + 1, + 2, + new int[]{0, 0}, + new int[]{1, 0}, + new int[]{2, 0}, + in1Type, + in2Type, + functionName, + allowMissing); } @PublicEvolving @@ -311,8 +416,19 @@ public class TypeExtractor { public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) { - return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false, - in1Type, in2Type, functionName, allowMissing); + return getBinaryOperatorReturnType( + (Function) crossInterface, + CrossFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + NO_INDEX, + in1Type, + in2Type, + functionName, + allowMissing); } @PublicEvolving @@ -324,7 +440,16 @@ public class TypeExtractor { public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) selectorInterface, + KeySelector.class, + 0, + 1, + new int[]{0}, + NO_INDEX, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -333,11 +458,53 @@ public class TypeExtractor { } @PublicEvolving - public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner, String functionName, boolean allowMissing) { - return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null); + public static <T> TypeInformation<T> getPartitionerTypes( + Partitioner<T> partitioner, + String functionName, + boolean allowMissing) { + try { + final LambdaExecutable exec; + try { + exec = checkAndExtractLambda(partitioner); + } catch (TypeExtractionException e) { + throw new InvalidTypesException("Internal error occurred.", e); + } + if (exec != null) { + // check for lambda type erasure + validateLambdaGenericParameters(exec); + + // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function + // paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure + final int paramLen = exec.getParameterTypes().length; + + final Method sam = TypeExtractionUtils.getSingleAbstractMethod(Partitioner.class); + // number of parameters the SAM of implemented interface has, the parameter indexing aplicates to this range + final int baseParametersLen = sam.getParameterTypes().length; + + final Type keyType = TypeExtractionUtils.extractTypeFromLambda( + exec, + new int[]{0}, + paramLen, + baseParametersLen); + return new TypeExtractor().privateCreateTypeInfo(keyType, null, null); + } else { + return new TypeExtractor().privateCreateTypeInfo( + Partitioner.class, + partitioner.getClass(), + 0, + null, + null); + } + } catch (InvalidTypesException e) { + if (allowMissing) { + return (TypeInformation<T>) new MissingTypeInfo(functionName != null ? functionName : partitioner.toString(), e); + } else { + throw e; + } + } } - - + + @SuppressWarnings("unchecked") @PublicEvolving public static <IN> TypeInformation<IN> getInputFormatTypes(InputFormat<IN, ?> inputFormatInterface) { @@ -354,49 +521,21 @@ public class TypeExtractor { /** * Returns the unary operator's return type. * - * @param function Function to extract the return type from - * @param baseClass Base class of the function - * @param hasIterable True if the first function parameter is an iterable, otherwise false - * @param hasCollector True if the function has an additional collector parameter, otherwise false - * @param inType Type of the input elements (In case of an iterable, it is the element type) - * @param functionName Function name - * @param allowMissing Can the type information be missing - * @param <IN> Input type - * @param <OUT> Output type - * @return TypeInformation of the return type of the function - */ - @SuppressWarnings("unchecked") - @PublicEvolving - public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType( - Function function, - Class<?> baseClass, - boolean hasIterable, - boolean hasCollector, - TypeInformation<IN> inType, - String functionName, - boolean allowMissing) { - - return getUnaryOperatorReturnType( - function, - baseClass, - hasIterable ? 0 : -1, - hasCollector ? 0 : -1, - inType, - functionName, - allowMissing); - } - - /** - * Returns the unary operator's return type. + * <p><b>NOTE:</b> lambda type indices allow extraction of Type from lambdas. To extract input type <b>IN</b> + * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInputTypeArgumentIndices. + * + * <pre> + * <code> + * OUT apply(Map<String, List<IN>> value) + * </code> + * </pre> * * @param function Function to extract the return type from * @param baseClass Base class of the function - * @param inputTypeArgumentIndex Index of the type argument of function's first parameter - * specifying the input type if it is wrapped (Iterable, Map, - * etc.). Otherwise -1. - * @param outputTypeArgumentIndex Index of the type argument of function's second parameter - * specifying the output type if it is wrapped in a Collector. - * Otherwise -1. + * @param inputTypeArgumentIndex Index of input type in the class specification + * @param outputTypeArgumentIndex Index of output type in the class specification + * @param lambdaInputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example. + * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example. * @param inType Type of the input elements (In case of an iterable, it is the element type) * @param functionName Function name * @param allowMissing Can the type information be missing @@ -411,6 +550,8 @@ public class TypeExtractor { Class<?> baseClass, int inputTypeArgumentIndex, int outputTypeArgumentIndex, + int[] lambdaInputTypeArgumentIndices, + int[] lambdaOutputTypeArgumentIndices, TypeInformation<IN> inType, String functionName, boolean allowMissing) { @@ -422,37 +563,63 @@ public class TypeExtractor { throw new InvalidTypesException("Internal error occurred.", e); } if (exec != null) { + Preconditions.checkArgument( + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1, + "Indices for input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaOutputTypeArgumentIndices != null, + "Indices for output type arguments within lambda not provided"); // check for lambda type erasure validateLambdaGenericParameters(exec); // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function - final int paramLen = exec.getParameterTypes().length - 1; + // paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure + final int paramLen = exec.getParameterTypes().length; + + final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass); + + // number of parameters the SAM of implemented interface has, the parameter indexing aplicates to this range + final int baseParametersLen = sam.getParameterTypes().length; // executable references "this" implicitly - if (paramLen < 0) { + if (paramLen <= 0) { // executable declaring class can also be a super class of the input type // we only validate if the executable exists in input type validateInputContainsExecutable(exec, inType); } else { - final Type input = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen]; - validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType); + final Type input = TypeExtractionUtils.extractTypeFromLambda( + exec, + lambdaInputTypeArgumentIndices, + paramLen, + baseParametersLen); + validateInputType(input, inType); } if (function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) function).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo( - (outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(), - inType, - null); - } - else { - validateInputType(baseClass, function.getClass(), 0, inType); + + final Type output; + if (lambdaOutputTypeArgumentIndices.length > 0) { + output = TypeExtractionUtils.extractTypeFromLambda( + exec, + lambdaOutputTypeArgumentIndices, + paramLen, + baseParametersLen); + } else { + output = exec.getReturnType(); + } + + return new TypeExtractor().privateCreateTypeInfo(output, inType, null); + } else { + Preconditions.checkArgument(inputTypeArgumentIndex >= 0, "Input type argument index was not provided"); + Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided"); + validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) function).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, inType, null); + return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, inType, null); } } catch (InvalidTypesException e) { @@ -467,54 +634,23 @@ public class TypeExtractor { /** * Returns the binary operator's return type. * - * @param function Function to extract the return type from - * @param baseClass Base class of the function - * @param hasIterables True if the first function parameter is an iterable, otherwise false - * @param hasCollector True if the function has an additional collector parameter, otherwise false - * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type) - * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type) - * @param functionName Function name - * @param allowMissing Can the type information be missing - * @param <IN1> Left side input type - * @param <IN2> Right side input type - * @param <OUT> Output type - * @return TypeInformation of the return type of the function - */ - @SuppressWarnings("unchecked") - @PublicEvolving - public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType( - Function function, - Class<?> baseClass, - boolean hasIterables, - boolean hasCollector, - TypeInformation<IN1> in1Type, - TypeInformation<IN2> in2Type, - String functionName, - boolean allowMissing) { - - return getBinaryOperatorReturnType( - function, - baseClass, - hasIterables ? 0 : -1, - hasCollector ? 0 : -1, - in1Type, - in2Type, - functionName, - allowMissing - ); - } - - /** - * Returns the binary operator's return type. + * <p><b>NOTE:</b> lambda type indices allows extraction of Type from lambdas. To extract input type <b>IN1</b> + * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInput1TypeArgumentIndices. + * + * <pre> + * <code> + * OUT apply(Map<String, List<IN1>> value1, List<IN2> value2) + * </code> + * </pre> * * @param function Function to extract the return type from * @param baseClass Base class of the function - * @param inputTypeArgumentIndex Index of the type argument of function's first parameter - * specifying the input type if it is wrapped (Iterable, Map, - * etc.). Otherwise -1. - * @param outputTypeArgumentIndex Index of the type argument of functions second parameter - * specifying the output type if it is wrapped in a Collector. - * Otherwise -1. + * @param input1TypeArgumentIndex Index of first input type in the class specification + * @param input2TypeArgumentIndex Index of second input type in the class specification + * @param outputTypeArgumentIndex Index of output type in the class specification + * @param lambdaInput1TypeArgumentIndices Table of indices of the type argument specifying the first input type. See example. + * @param lambdaInput2TypeArgumentIndices Table of indices of the type argument specifying the second input type. See example. + * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example. * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type) * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type) * @param functionName Function name @@ -529,8 +665,12 @@ public class TypeExtractor { public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType( Function function, Class<?> baseClass, - int inputTypeArgumentIndex, + int input1TypeArgumentIndex, + int input2TypeArgumentIndex, int outputTypeArgumentIndex, + int[] lambdaInput1TypeArgumentIndices, + int[] lambdaInput2TypeArgumentIndices, + int[] lambdaOutputTypeArgumentIndices, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, @@ -543,30 +683,67 @@ public class TypeExtractor { throw new InvalidTypesException("Internal error occurred.", e); } if (exec != null) { + Preconditions.checkArgument( + lambdaInput1TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1, + "Indices for first input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaInput2TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1, + "Indices for second input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaOutputTypeArgumentIndices != null, + "Indices for output type arguments within lambda not provided"); // check for lambda type erasure validateLambdaGenericParameters(exec); - + + final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass); + final int baseParametersLen = sam.getParameterTypes().length; + // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function - final int paramLen = exec.getParameterTypes().length - 1; - final Type input1 = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 2] : exec.getParameterTypes()[paramLen - 1]; - final Type input2 = (outputTypeArgumentIndex >= 0 ) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen]; - validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input1, inputTypeArgumentIndex) : input1, in1Type); - validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input2, inputTypeArgumentIndex) : input2, in2Type); + final int paramLen = exec.getParameterTypes().length; + + final Type input1 = TypeExtractionUtils.extractTypeFromLambda( + exec, + lambdaInput1TypeArgumentIndices, + paramLen, + baseParametersLen); + final Type input2 = TypeExtractionUtils.extractTypeFromLambda( + exec, + lambdaInput2TypeArgumentIndices, + paramLen, + baseParametersLen); + + validateInputType(input1, in1Type); + validateInputType(input2, in2Type); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) function).getProducedType(); } + + final Type output; + if (lambdaOutputTypeArgumentIndices.length > 0) { + output = TypeExtractionUtils.extractTypeFromLambda( + exec, + lambdaOutputTypeArgumentIndices, + paramLen, + baseParametersLen); + } else { + output = exec.getReturnType(); + } + return new TypeExtractor().privateCreateTypeInfo( - (outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(), + output, in1Type, in2Type); } else { - validateInputType(baseClass, function.getClass(), 0, in1Type); - validateInputType(baseClass, function.getClass(), 1, in2Type); + Preconditions.checkArgument(input1TypeArgumentIndex >= 0, "Input 1 type argument index was not provided"); + Preconditions.checkArgument(input2TypeArgumentIndex >= 0, "Input 2 type argument index was not provided"); + Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided"); + validateInputType(baseClass, function.getClass(), input1TypeArgumentIndex, in1Type); + validateInputType(baseClass, function.getClass(), input2TypeArgumentIndex, in2Type); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) function).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, in1Type, in2Type); + return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, in1Type, in2Type); } } catch (InvalidTypesException e) { @@ -577,7 +754,7 @@ public class TypeExtractor { } } } - + // -------------------------------------------------------------------------------------------- // Create type information // -------------------------------------------------------------------------------------------- @@ -586,7 +763,7 @@ public class TypeExtractor { public static <T> TypeInformation<T> createTypeInfo(Class<T> type) { return (TypeInformation<T>) createTypeInfo((Type) type); } - + public static TypeInformation<?> createTypeInfo(Type t) { TypeInformation<?> ti = new TypeExtractor().privateCreateTypeInfo(t); if (ti == null) { @@ -628,46 +805,46 @@ public class TypeExtractor { } return ti; } - + // ----------------------------------- private methods ---------------------------------------- - + private TypeInformation<?> privateCreateTypeInfo(Type t) { ArrayList<Type> typeHierarchy = new ArrayList<Type>(); typeHierarchy.add(t); return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null); } - + // for (Rich)Functions @SuppressWarnings("unchecked") private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { ArrayList<Type> typeHierarchy = new ArrayList<Type>(); Type returnType = getParameterType(baseClass, typeHierarchy, clazz, returnParamPos); - + TypeInformation<OUT> typeInfo; - + // return type is a variable -> try to get the type info from the input directly if (returnType instanceof TypeVariable<?>) { typeInfo = (TypeInformation<OUT>) createTypeInfoFromInputs((TypeVariable<?>) returnType, typeHierarchy, in1Type, in2Type); - + if (typeInfo != null) { return typeInfo; } } - + // get info from hierarchy return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type); } - + // for LambdaFunctions @SuppressWarnings("unchecked") private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { ArrayList<Type> typeHierarchy = new ArrayList<Type>(); - + // get info from hierarchy return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type); } - + @SuppressWarnings({ "unchecked", "rawtypes" }) private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { @@ -680,29 +857,29 @@ public class TypeExtractor { // check if type is a subclass of tuple else if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) { Type curT = t; - + // do not allow usage of Tuple as type if (typeToClass(t).equals(Tuple.class)) { throw new InvalidTypesException( "Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead."); } - + // go up the hierarchy until we reach immediate child of Tuple (with or without generics) - // collect the types while moving up for a later top-down + // collect the types while moving up for a later top-down while (!(isClassType(curT) && typeToClass(curT).getSuperclass().equals(Tuple.class))) { typeHierarchy.add(curT); curT = typeToClass(curT).getGenericSuperclass(); } - + if(curT == Tuple0.class) { return new TupleTypeInfo(Tuple0.class); } - + // check if immediate child of Tuple has generics if (curT instanceof Class<?>) { throw new InvalidTypesException("Tuple needs to be parameterized by using generics."); } - + typeHierarchy.add(curT); // create the type information for the subtypes @@ -718,13 +895,13 @@ public class TypeExtractor { } // return tuple info return new TupleTypeInfo(typeToClass(t), subTypesInfo); - + } // type depends on another type // e.g. class MyMapper<E> extends MapFunction<String, E> else if (t instanceof TypeVariable) { Type typeVar = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) t); - + if (!(typeVar instanceof TypeVariable)) { return createTypeInfoWithTypeHierarchy(typeHierarchy, typeVar, in1Type, in2Type); } @@ -741,12 +918,12 @@ public class TypeExtractor { } } } - // arrays with generics + // arrays with generics else if (t instanceof GenericArrayType) { GenericArrayType genericArray = (GenericArrayType) t; - + Type componentType = genericArray.getGenericComponentType(); - + // due to a Java 6 bug, it is possible that the JVM classifies e.g. String[] or int[] as GenericArrayType instead of Class if (componentType instanceof Class) { Class<?> componentClass = (Class<?>) componentType; @@ -775,11 +952,11 @@ public class TypeExtractor { else if (t instanceof Class) { return privateGetForClass((Class<OUT>) t, typeHierarchy); } - + throw new InvalidTypesException("Type Information could not be created."); } - private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy, + private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy, TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) { Type matReturnTypeVar = materializeTypeVariable(returnTypeHierarchy, returnTypeVar); @@ -791,12 +968,12 @@ public class TypeExtractor { else { returnTypeVar = (TypeVariable<?>) matReturnTypeVar; } - + // no input information exists if (in1TypeInfo == null && in2TypeInfo == null) { return null; } - + // create a new type hierarchy for the input ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>(); // copy the function part of the type hierarchy @@ -809,7 +986,7 @@ public class TypeExtractor { } } ParameterizedType baseClass = (ParameterizedType) inputTypeHierarchy.get(inputTypeHierarchy.size() - 1); - + TypeInformation<?> info = null; if (in1TypeInfo != null) { // find the deepest type variable that describes the type of input 1 @@ -898,18 +1075,18 @@ public class TypeExtractor { // the input is a tuple else if (inTypeInfo instanceof TupleTypeInfo && isClassType(inType) && Tuple.class.isAssignableFrom(typeToClass(inType))) { ParameterizedType tupleBaseClass; - + // get tuple from possible tuple subclass while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) { inputTypeHierarchy.add(inType); inType = typeToClass(inType).getGenericSuperclass(); } inputTypeHierarchy.add(inType); - + // we can assume to be parameterized since we // already did input validation tupleBaseClass = (ParameterizedType) inType; - + Type[] tupleElements = tupleBaseClass.getActualTypeArguments(); // go thru all tuple elements and search for type variables for (int i = 0; i < tupleElements.length; i++) { @@ -1068,13 +1245,13 @@ public class TypeExtractor { public static Type getParameterType(Class<?> baseClass, Class<?> clazz, int pos) { return getParameterType(baseClass, null, clazz, pos); } - + private static Type getParameterType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Class<?> clazz, int pos) { if (typeHierarchy != null) { typeHierarchy.add(clazz); } Type[] interfaceTypes = clazz.getGenericInterfaces(); - + // search in interfaces for base class for (Type t : interfaceTypes) { Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos); @@ -1082,18 +1259,18 @@ public class TypeExtractor { return parameter; } } - + // search in superclass for base class Type t = clazz.getGenericSuperclass(); Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos); if (parameter != null) { return parameter; } - - throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " + + + throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " + "Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point"); } - + private static Type getParameterTypeFromGenericType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Type t, int pos) { // base class if (t instanceof ParameterizedType && baseClass.equals(((ParameterizedType) t).getRawType())) { @@ -1109,7 +1286,7 @@ public class TypeExtractor { typeHierarchy.add(t); } return getParameterType(baseClass, typeHierarchy, (Class<?>) ((ParameterizedType) t).getRawType(), pos); - } + } else if (t instanceof Class<?> && baseClass.isAssignableFrom((Class<?>) t)) { if (typeHierarchy != null) { typeHierarchy.add(t); @@ -1118,11 +1295,11 @@ public class TypeExtractor { } return null; } - + // -------------------------------------------------------------------------------------------- // Validate input // -------------------------------------------------------------------------------------------- - + private static void validateInputType(Type t, TypeInformation<?> inType) { ArrayList<Type> typeHierarchy = new ArrayList<Type>(); try { @@ -1132,7 +1309,7 @@ public class TypeExtractor { throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e); } } - + private static void validateInputType(Class<?> baseClass, Class<?> clazz, int inputParamPos, TypeInformation<?> inTypeInfo) { ArrayList<Type> typeHierarchy = new ArrayList<Type>(); @@ -1152,21 +1329,21 @@ public class TypeExtractor { throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e); } } - + @SuppressWarnings("unchecked") private static void validateInfo(ArrayList<Type> typeHierarchy, Type type, TypeInformation<?> typeInfo) { if (type == null) { throw new InvalidTypesException("Unknown Error. Type is null."); } - + if (typeInfo == null) { throw new InvalidTypesException("Unknown Error. TypeInformation is null."); } - + if (!(type instanceof TypeVariable<?>)) { // check for Java Basic Types if (typeInfo instanceof BasicTypeInfo) { - + TypeInformation<?> actual; // check if basic type at all if (!(type instanceof Class<?>) || (actual = BasicTypeInfo.getInfoFor((Class<?>) type)) == null) { @@ -1176,7 +1353,7 @@ public class TypeExtractor { if (!typeInfo.equals(actual)) { throw new InvalidTypesException("Basic type '" + typeInfo + "' expected but was '" + actual + "'."); } - + } // check for Java SQL time types else if (typeInfo instanceof SqlTimeTypeInfo) { @@ -1198,36 +1375,36 @@ public class TypeExtractor { if (!(isClassType(type) && Tuple.class.isAssignableFrom(typeToClass(type)))) { throw new InvalidTypesException("Tuple type expected."); } - + // do not allow usage of Tuple as type if (isClassType(type) && typeToClass(type).equals(Tuple.class)) { throw new InvalidTypesException("Concrete subclass of Tuple expected."); } - + // go up the hierarchy until we reach immediate child of Tuple (with or without generics) while (!(isClassType(type) && typeToClass(type).getSuperclass().equals(Tuple.class))) { typeHierarchy.add(type); type = typeToClass(type).getGenericSuperclass(); } - + if(type == Tuple0.class) { return; } - + // check if immediate child of Tuple has generics if (type instanceof Class<?>) { throw new InvalidTypesException("Parameterized Tuple type expected."); } - + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) typeInfo; - + Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments(); - + if (subTypes.length != tti.getArity()) { throw new InvalidTypesException("Tuple arity '" + tti.getArity() + "' expected but was '" + subTypes.length + "'."); } - + for (int i = 0; i < subTypes.length; i++) { validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i)); } @@ -1258,16 +1435,16 @@ public class TypeExtractor { && !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) { throw new InvalidTypesException("Array type expected."); } - + if (component instanceof TypeVariable<?>) { component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component); if (component instanceof TypeVariable) { return; } } - + validateInfo(typeHierarchy, component, ((BasicArrayTypeInfo<?, ?>) typeInfo).getComponentInfo()); - + } // check for object array else if (typeInfo instanceof ObjectArrayTypeInfo<?, ?>) { @@ -1275,7 +1452,7 @@ public class TypeExtractor { if (!(type instanceof Class<?> && ((Class<?>) type).isArray()) && !(type instanceof GenericArrayType)) { throw new InvalidTypesException("Object array type expected."); } - + // check component Type component; if (type instanceof Class<?>) { @@ -1283,14 +1460,14 @@ public class TypeExtractor { } else { component = ((GenericArrayType) type).getGenericComponentType(); } - + if (component instanceof TypeVariable<?>) { component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component); if (component instanceof TypeVariable) { return; } } - + validateInfo(typeHierarchy, component, ((ObjectArrayTypeInfo<?, ?>) typeInfo).getComponentInfo()); } // check for value @@ -1299,7 +1476,7 @@ public class TypeExtractor { if (!(type instanceof Class<?> && Value.class.isAssignableFrom((Class<?>) type))) { throw new InvalidTypesException("Value type expected."); } - + TypeInformation<?> actual; // check value type contents if (!((ValueTypeInfo<?>) typeInfo).equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) { @@ -1456,33 +1633,6 @@ public class TypeExtractor { return fieldCount; } - /** - * * This method extracts the n-th type argument from the given type. An InvalidTypesException - * is thrown if the type does not have any type arguments or if the index exceeds the number - * of type arguments. - * - * @param t Type to extract the type arguments from - * @param index Index of the type argument to extract - * @return The extracted type argument - * @throws InvalidTypesException if the given type does not have any type arguments or if the - * index exceeds the number of type arguments. - */ - private static Type extractTypeArgument(Type t, int index) throws InvalidTypesException { - if(t instanceof ParameterizedType) { - Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments(); - - if (index < 0 || index >= actualTypeArguments.length) { - throw new InvalidTypesException("Cannot extract the type argument with index " + - index + " because the type has only " + actualTypeArguments.length + - " type arguments."); - } else { - return actualTypeArguments[index]; - } - } else { - throw new InvalidTypesException("The given type " + t + " is not a parameterized type."); - } - } - private static void validateLambdaGenericParameters(LambdaExecutable exec) { // check the arguments for (Type t : exec.getParameterTypes()) { @@ -1516,19 +1666,19 @@ public class TypeExtractor { // iterate thru hierarchy from top to bottom until type variable gets a class assigned for (int i = typeHierarchy.size() - 1; i >= 0; i--) { Type curT = typeHierarchy.get(i); - + // parameterized type if (curT instanceof ParameterizedType) { Class<?> rawType = ((Class<?>) ((ParameterizedType) curT).getRawType()); - + for (int paramIndex = 0; paramIndex < rawType.getTypeParameters().length; paramIndex++) { - + TypeVariable<?> curVarOfCurT = rawType.getTypeParameters()[paramIndex]; - + // check if variable names match if (sameTypeVars(curVarOfCurT, inTypeTypeVar)) { Type curVarType = ((ParameterizedType) curT).getActualTypeArguments()[paramIndex]; - + // another type variable level if (curVarType instanceof TypeVariable<?>) { inTypeTypeVar = (TypeVariable<?>) curVarType; @@ -1545,14 +1695,14 @@ public class TypeExtractor { // return the type variable of the deepest level return inTypeTypeVar; } - + /** * Creates type information from a given Class such as Integer, String[] or POJOs. - * - * This method does not support ParameterizedTypes such as Tuples or complex type hierarchies. + * + * This method does not support ParameterizedTypes such as Tuples or complex type hierarchies. * In most cases {@link TypeExtractor#createTypeInfo(Type)} is the recommended method for type extraction * (a Class is a child of Type). - * + * * @param clazz a Class to create TypeInformation for * @return TypeInformation that describes the passed Class */ @@ -1561,7 +1711,7 @@ public class TypeExtractor { typeHierarchy.add(clazz); return new TypeExtractor().privateGetForClass(clazz, typeHierarchy); } - + private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) { return privateGetForClass(clazz, typeHierarchy, null, null, null); } @@ -1600,13 +1750,13 @@ public class TypeExtractor { if (primitiveArrayInfo != null) { return primitiveArrayInfo; } - + // basic type arrays: String[], Integer[], Double[] BasicArrayTypeInfo<OUT, ?> basicArrayInfo = BasicArrayTypeInfo.getInfoFor(clazz); if (basicArrayInfo != null) { return basicArrayInfo; } - + // object arrays else { TypeInformation<?> componentTypeInfo = createTypeInfoWithTypeHierarchy( @@ -1618,7 +1768,7 @@ public class TypeExtractor { return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo); } } - + // check for writable types if (isHadoopWritable(clazz)) { return createHadoopWritableTypeInfo(clazz); @@ -1635,13 +1785,13 @@ public class TypeExtractor { if (timeTypeInfo != null) { return timeTypeInfo; } - + // check for subclasses of Value if (Value.class.isAssignableFrom(clazz)) { Class<? extends Value> valueClass = clazz.asSubclass(Value.class); return (TypeInformation<OUT>) ValueTypeInfo.getValueTypeInfo(valueClass); } - + // check for subclasses of Tuple if (Tuple.class.isAssignableFrom(clazz)) { if(clazz == Tuple0.class) { @@ -1680,13 +1830,13 @@ public class TypeExtractor { // return a generic type return new GenericTypeInfo<OUT>(clazz); } - + /** * Checks if the given field is a valid pojo field: * - it is public * OR * - there are getter and setter methods for the field. - * + * * @param f field to check * @param clazz class of field * @param typeHierarchy type hierarchy for materializing generic types @@ -1753,7 +1903,7 @@ public class TypeExtractor { LOG.info("Class " + clazz.getName() + " is not public, cannot treat it as a POJO type. Will be handled as GenericType"); return new GenericTypeInfo<OUT>(clazz); } - + // add the hierarchy of the POJO itself if it is generic if (parameterizedType != null) { getTypeHierarchy(typeHierarchy, parameterizedType, Object.class); @@ -1762,7 +1912,7 @@ public class TypeExtractor { else if (typeHierarchy.size() <= 1) { getTypeHierarchy(typeHierarchy, clazz, Object.class); } - + List<Field> fields = getAllDeclaredFields(clazz, false); if (fields.size() == 0) { LOG.info("No fields detected for " + clazz + ". Cannot be used as a PojoType. Will be handled as GenericType"); @@ -1822,7 +1972,7 @@ public class TypeExtractor { LOG.info("The default constructor of " + clazz + " should be Public to be used as a POJO."); return null; } - + // everything is checked, we return the pojo return pojoType; } @@ -1870,7 +2020,7 @@ public class TypeExtractor { } return null; } - + private static boolean hasFieldWithSameName(String name, List<Field> fields) { for(Field field : fields) { if(name.equals(field.getName())) { @@ -1879,7 +2029,7 @@ public class TypeExtractor { } return false; } - + private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) { for (int j = 0; j < pojoInfo.getArity(); j++) { PojoField pf = ((PojoTypeInfo<?>) pojoInfo).getPojoFieldAt(j); @@ -1911,20 +2061,20 @@ public class TypeExtractor { Tuple t = (Tuple) value; int numFields = t.getArity(); if(numFields != countFieldsInClass(value.getClass())) { - // not a tuple since it has more fields. + // not a tuple since it has more fields. return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>(), null, null, null); // we immediately call analyze Pojo here, because // there is currently no other type that can handle such a class. } - + TypeInformation<?>[] infos = new TypeInformation[numFields]; for (int i = 0; i < numFields; i++) { Object field = t.getField(i); - + if (field == null) { throw new InvalidTypesException("Automatic type extraction is not possible on candidates with null values. " + "Please specify the types directly."); } - + infos[i] = privateGetForObject(field); } return new TupleTypeInfo(value.getClass(), infos); @@ -2013,10 +2163,10 @@ public class TypeExtractor { static void validateIfWritable(TypeInformation<?> typeInfo, Type type) { try { // try to load the writable type info - + Class<?> writableTypeInfoClass = Class .forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, typeInfo.getClass().getClassLoader()); - + if (writableTypeInfoClass.isAssignableFrom(typeInfo.getClass())) { // this is actually a writable type info // check if the type is a writable http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java index 64ff605..7500d73 100644 --- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java +++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -261,6 +262,18 @@ public class LambdaExtractionTest { Assert.assertTrue(ti instanceof MissingTypeInfo); } + @Test + public void testPartitionerLambda() { + Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions; + final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + + } + private static class MyType { private int key; http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java index 4eff037..37bf872 100644 --- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java +++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java @@ -53,7 +53,6 @@ public class CEPLambdaTest extends TestLogger { * Tests that a Java8 lambda can be passed as a CEP select function. */ @Test - @Ignore public void testLambdaSelectFunction() { TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class); TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class); @@ -81,7 +80,6 @@ public class CEPLambdaTest extends TestLogger { * Tests that a Java8 lambda can be passed as a CEP flat select function. */ @Test - @Ignore public void testLambdaFlatSelectFunction() { TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class); TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class); http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java index 71614cf..3131a94 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java @@ -83,8 +83,10 @@ public class PatternStream<T> { TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType( patternSelectFunction, PatternSelectFunction.class, + 0, 1, - -1, + new int[]{0, 1, 0}, + new int[]{}, inputStream.getType(), null, false); @@ -142,8 +144,10 @@ public class PatternStream<T> { TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternTimeoutFunction, PatternTimeoutFunction.class, + 0, 1, - -1, + new int[]{0, 1, 0}, + new int[]{}, inputStream.getType(), null, false); @@ -151,8 +155,10 @@ public class PatternStream<T> { TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternSelectFunction, PatternSelectFunction.class, + 0, 1, - -1, + new int[]{0, 1, 0}, + new int[]{}, inputStream.getType(), null, false); @@ -184,8 +190,10 @@ public class PatternStream<T> { TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternFlatSelectFunction, PatternFlatSelectFunction.class, - 1, 0, + 1, + new int[] {0, 1, 0}, + new int[] {1, 0}, inputStream.getType(), null, false); @@ -244,8 +252,10 @@ public class PatternStream<T> { TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternFlatTimeoutFunction, PatternFlatTimeoutFunction.class, + 0, 1, - -1, + new int[]{0, 1, 0}, + new int[]{2, 0}, inputStream.getType(), null, false); @@ -253,8 +263,10 @@ public class PatternStream<T> { TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternFlatSelectFunction, PatternFlatSelectFunction.class, + 0, 1, - -1, + new int[]{0, 1, 0}, + new int[]{1, 0}, inputStream.getType(), null, false); http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 69ba42f..950b5da 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -232,7 +232,7 @@ public class CEPMigration11to13Test { NullByteKeySelector keySelector = new NullByteKeySelector(); OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = - new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), false, @@ -284,7 +284,7 @@ public class CEPMigration11to13Test { OperatorStateHandles snapshot = harness.snapshot(1L, 1L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( + harness = new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), false, http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java index 9c4f88e..4cb4e01 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java @@ -73,7 +73,16 @@ public class Translate { Class<Vertex<NEW, VV>> vertexClass = (Class<Vertex<NEW, VV>>) (Class<? extends Vertex>) Vertex.class; TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(0); - TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); + TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType( + translator, + TranslateFunction.class, + 0, + 1, + new int[]{0}, + new int[]{1}, + oldType, + null, + false); TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(1); TupleTypeInfo<Vertex<NEW, VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); @@ -148,7 +157,16 @@ public class Translate { Class<Edge<NEW, EV>> edgeClass = (Class<Edge<NEW, EV>>) (Class<? extends Edge>) Edge.class; TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(0); - TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); + TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType( + translator, + TranslateFunction.class, + 0, + 1, + new int[] {0}, + new int[] {1}, + oldType, + null, + false); TypeInformation<EV> edgeValueType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(2); TupleTypeInfo<Edge<NEW, EV>> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType); @@ -225,7 +243,16 @@ public class Translate { Class<Vertex<K, NEW>> vertexClass = (Class<Vertex<K, NEW>>) (Class<? extends Vertex>) Vertex.class; TypeInformation<K> idType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(0); TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(1); - TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); + TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType( + translator, + TranslateFunction.class, + 0, + 1, + new int[]{0}, + new int[]{1}, + oldType, + null, + false); TupleTypeInfo<Vertex<K, NEW>> returnType = new TupleTypeInfo<>(vertexClass, idType, newType); @@ -300,7 +327,16 @@ public class Translate { Class<Edge<K, NEW>> edgeClass = (Class<Edge<K, NEW>>) (Class<? extends Edge>) Edge.class; TypeInformation<K> idType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(0); TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(2); - TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); + TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType( + translator, + TranslateFunction.class, + 0, + 1, + new int[]{0}, + new int[]{1}, + oldType, + null, + false); TupleTypeInfo<Edge<K, NEW>> returnType = new TupleTypeInfo<>(edgeClass, idType, idType, newType); http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 31dbb4f..ae97109 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -236,8 +236,7 @@ public class AllWindowedStream<T, W extends Window> { AllWindowFunction<T, R, W> function) { TypeInformation<T> inType = input.getType(); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, inType, null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType); return reduce(reduceFunction, function, resultType); } @@ -332,8 +331,7 @@ public class AllWindowedStream<T, W extends Window> { ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> function) { - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessAllWindowFunction.class, true, true, input.getType(), null, false); + TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, input.getType()); return reduce(reduceFunction, function, resultType); } @@ -507,12 +505,41 @@ public class AllWindowedStream<T, W extends Window> { TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType( + AllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + AllWindowFunction.class, + 0, + 1, + new int[]{1, 0}, + new int[]{2, 0}, + inType, + null, + false); + } + + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType( + ProcessAllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessAllWindowFunction.class, + 0, + 1, + new int[]{1, 0}, + new int[]{2, 0}, + inType, + null, + false); + } + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is @@ -642,8 +669,7 @@ public class AllWindowedStream<T, W extends Window> { TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, ProcessAllWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } @@ -811,8 +837,7 @@ public class AllWindowedStream<T, W extends Window> { TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, foldAccumulatorType, null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, foldAccumulatorType); return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType); } @@ -923,8 +948,7 @@ public class AllWindowedStream<T, W extends Window> { TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessAllWindowFunction.class, true, true, foldAccumulatorType, null, false); + TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, foldAccumulatorType); return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType); } @@ -1032,8 +1056,7 @@ public class AllWindowedStream<T, W extends Window> { public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) { String callLocation = Utils.getCallLocationName(); function = input.getExecutionEnvironment().clean(function); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, getInputType(), null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType()); return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); } @@ -1069,8 +1092,7 @@ public class AllWindowedStream<T, W extends Window> { public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) { String callLocation = Utils.getCallLocationName(); function = input.getExecutionEnvironment().clean(function); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessAllWindowFunction.class, true, true, getInputType(), null, false); + TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, getInputType()); return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation); } @@ -1160,8 +1182,7 @@ public class AllWindowedStream<T, W extends Window> { @Deprecated public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) { TypeInformation<T> inType = input.getType(); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, inType, null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType); return apply(reduceFunction, function, resultType); } http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java index 8461d2c..cb18a3f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java @@ -67,9 +67,16 @@ public class AsyncDataStream { int bufSize, OutputMode mode) { - TypeInformation<OUT> outTypeInfo = - TypeExtractor.getUnaryOperatorReturnType(func, AsyncFunction.class, false, - true, in.getType(), Utils.getCallLocationName(), true); + TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( + func, + AsyncFunction.class, + 0, + 1, + new int[]{0}, + new int[]{1, 0}, + in.getType(), + Utils.getCallLocationName(), + true); // create transform AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>( http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index 8dad1cb..4bbb123 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -235,15 +235,12 @@ public class CoGroupedStreams<T1, T2> { */ public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) { - TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( - function, - CoGroupFunction.class, - true, - true, - input1.getType(), - input2.getType(), - "CoGroup", - false); + TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes( + function, + input1.getType(), + input2.getType(), + "CoGroup", + false); return apply(function, resultType); } http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java index 0b882c8..e244bd2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java @@ -203,9 +203,19 @@ public class ConnectedStreams<IN1, IN2> { */ public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) { - TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper, - CoMapFunction.class, false, true, getType1(), getType2(), - Utils.getCallLocationName(), true); + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coMapper, + CoMapFunction.class, + 0, + 1, + 2, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + getType1(), + getType2(), + Utils.getCallLocationName(), + true); return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper))); @@ -227,9 +237,19 @@ public class ConnectedStreams<IN1, IN2> { public <R> SingleOutputStreamOperator<R> flatMap( CoFlatMapFunction<IN1, IN2, R> coFlatMapper) { - TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper, - CoFlatMapFunction.class, false, true, getType1(), getType2(), - Utils.getCallLocationName(), true); + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coFlatMapper, + CoFlatMapFunction.class, + 0, + 1, + 2, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + getType1(), + getType2(), + Utils.getCallLocationName(), + true); return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper))); } @@ -254,9 +274,19 @@ public class ConnectedStreams<IN1, IN2> { public <R> SingleOutputStreamOperator<R> process( CoProcessFunction<IN1, IN2, R> coProcessFunction) { - TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction, - CoProcessFunction.class, false, true, getType1(), getType2(), - Utils.getCallLocationName(), true); + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coProcessFunction, + CoProcessFunction.class, + 0, + 1, + 2, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + getType1(), + getType2(), + Utils.getCallLocationName(), + true); return process(coProcessFunction, outTypeInfo); } http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 0cdc9a1..66cd8e6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -574,13 +574,15 @@ public class DataStream<T> { public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( - processFunction, - ProcessFunction.class, - false, - true, - getType(), - Utils.getCallLocationName(), - true); + processFunction, + ProcessFunction.class, + 0, + 1, + new int[]{0}, + new int[]{2, 0}, + getType(), + Utils.getCallLocationName(), + true); return process(processFunction, outType); } http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java index e1ffe86..f23ebcf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java @@ -221,14 +221,18 @@ public class JoinedStreams<T1, T2> { */ public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( - function, - JoinFunction.class, - true, - true, - input1.getType(), - input2.getType(), - "Join", - false); + function, + JoinFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + TypeExtractor.NO_INDEX, + input1.getType(), + input2.getType(), + "Join", + false); return apply(function, resultType); } @@ -300,14 +304,18 @@ public class JoinedStreams<T1, T2> { */ public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( - function, - FlatJoinFunction.class, - true, - true, - input1.getType(), - input2.getType(), - "Join", - false); + function, + FlatJoinFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + new int[]{2, 0}, + input1.getType(), + input2.getType(), + "Join", + false); return apply(function, resultType); } http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 698deb8..851e614 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -267,13 +267,15 @@ public class KeyedStream<T, KEY> extends DataStream<T> { public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( - processFunction, - ProcessFunction.class, - false, - true, - getType(), - Utils.getCallLocationName(), - true); + processFunction, + ProcessFunction.class, + 0, + 1, + new int[]{0}, + new int[]{2, 0}, + getType(), + Utils.getCallLocationName(), + true); return process(processFunction, outType); } http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index f8a1914..a795064 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -295,8 +295,7 @@ public class WindowedStream<T, K, W extends Window> { LegacyWindowOperatorType legacyWindowOpType) { TypeInformation<T> inType = input.getType(); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, inType, null, false); + TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType); return reduce(reduceFunction, function, resultType, legacyWindowOpType); } @@ -396,8 +395,7 @@ public class WindowedStream<T, K, W extends Window> { */ @PublicEvolving public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) { - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessWindowFunction.class, true, true, input.getType(), null, false); + TypeInformation<R> resultType = getProcessWindowFunctionReturnType(function, input.getType(), null); return reduce(reduceFunction, function, resultType); } @@ -544,8 +542,7 @@ public class WindowedStream<T, K, W extends Window> { TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, foldAccumulatorType, null, false); + TypeInformation<R> resultType = getWindowFunctionReturnType(function, foldAccumulatorType); return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType); } @@ -663,8 +660,7 @@ public class WindowedStream<T, K, W extends Window> { TypeInformation<ACC> foldResultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - TypeInformation<R> windowResultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, ProcessWindowFunction.class, true, true, foldResultType, Utils.getCallLocationName(), false); + TypeInformation<R> windowResultType = getProcessWindowFunctionReturnType(windowFunction, foldResultType, Utils.getCallLocationName()); return fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType); } @@ -852,8 +848,7 @@ public class WindowedStream<T, K, W extends Window> { TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, WindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } @@ -981,12 +976,42 @@ public class WindowedStream<T, K, W extends Window> { TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getProcessWindowFunctionReturnType(windowFunction, aggResultType, null); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT, KEY> TypeInformation<OUT> getWindowFunctionReturnType( + WindowFunction<IN, OUT, KEY, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + WindowFunction.class, + 0, + 1, + new int[]{2, 0}, + new int[]{3, 0}, + inType, + null, + false); + } + + private static <IN, OUT, KEY> TypeInformation<OUT> getProcessWindowFunctionReturnType( + ProcessWindowFunction<IN, OUT, KEY, ?> function, + TypeInformation<IN> inType, + String functionName) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessWindowFunction.class, + 0, + 1, + new int[]{2, 0}, + new int[]{3, 0}, + inType, + functionName, + false); + } + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is @@ -1094,8 +1119,7 @@ public class WindowedStream<T, K, W extends Window> { * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) { - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, getInputType(), null, false); + TypeInformation<R> resultType = getWindowFunctionReturnType(function, getInputType()); return apply(function, resultType); } @@ -1131,8 +1155,7 @@ public class WindowedStream<T, K, W extends Window> { */ @PublicEvolving public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) { - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessWindowFunction.class, true, true, getInputType(), null, false); + TypeInformation<R> resultType = getProcessWindowFunctionReturnType(function, getInputType(), null); return process(function, resultType); } @@ -1231,8 +1254,7 @@ public class WindowedStream<T, K, W extends Window> { @Deprecated public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) { TypeInformation<T> inType = input.getType(); - TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, inType, null, false); + TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType); return apply(reduceFunction, function, resultType); }
