[ 
https://issues.apache.org/jira/browse/FLINK-6783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16052877#comment-16052877
 ] 

ASF GitHub Bot commented on FLINK-6783:
---------------------------------------

Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4039#discussion_r122570865
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
 ---
    @@ -161,6 +164,77 @@ public static LambdaExecutable 
checkAndExtractLambda(Function function) throws T
        }
     
        /**
    +    * Extracts type from given index from lambda. It supports nested types.
    +    *
    +    * @param exec lambda function to extract the type from
    +    * @param lambdaTypeArgumentIndices position of type to extract in type 
hierarchy
    +    * @param paramLen count of total parameters of the lambda (including 
closure parameters)
    +    * @param baseParametersLen count of lambda interface parameters 
(without closure parameters)
    +    * @return extracted type
    +    */
    +   public static Type extractTypeFromLambda(
    +           LambdaExecutable exec,
    +           int[] lambdaTypeArgumentIndices,
    +           int paramLen,
    +           int baseParametersLen) {
    +           Type output = exec.getParameterTypes()[paramLen - 
baseParametersLen + lambdaTypeArgumentIndices[0]];
    +           for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) {
    +                   output = extractTypeArgument(output, 
lambdaTypeArgumentIndices[i]);
    +           }
    +           return output;
    +   }
    +
    +   /**
    +    * * This method extracts the n-th type argument from the given type. 
An InvalidTypesException
    +    * is thrown if the type does not have any type arguments or if the 
index exceeds the number
    +    * of type arguments.
    +    *
    +    * @param t Type to extract the type arguments from
    +    * @param index Index of the type argument to extract
    +    * @return The extracted type argument
    +    * @throws InvalidTypesException if the given type does not have any 
type arguments or if the
    +    * index exceeds the number of type arguments.
    +    */
    +   public static Type extractTypeArgument(Type t, int index) throws 
InvalidTypesException {
    +           if (t instanceof ParameterizedType) {
    +                   Type[] actualTypeArguments = ((ParameterizedType) 
t).getActualTypeArguments();
    +
    +                   if (index < 0 || index >= actualTypeArguments.length) {
    +                           throw new InvalidTypesException("Cannot extract 
the type argument with index " +
    +                                                                           
                index + " because the type has only " + 
actualTypeArguments.length +
    +                                                                           
                " type arguments.");
    +                   } else {
    +                           return actualTypeArguments[index];
    +                   }
    +           } else {
    +                   throw new InvalidTypesException("The given type " + t + 
" is not a parameterized type.");
    +           }
    +   }
    +
    +   /**
    +    * Extracts a Single Abstract Method (SAM) as defined in Java 
Specification (4.3.2. The Class Object,
    +    * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given 
class.
    +    *
    +    * @param baseClass
    +    * @throws InvalidTypesException if the given class does not implement
    +    * @return
    +    */
    +   public static Method getSingleAbstractMethod(Class<?> baseClass) {
    +           Method sam = null;
    +           for (Method method : baseClass.getMethods()) {
    +                   if (Modifier.isAbstract(method.getModifiers())) {
    +                           if (sam == null) {
    +                                   sam = method;
    +                           } else {
    +                                   throw new InvalidTypesException(
    +                                           "Given class: " + baseClass + " 
is not a FunctionalInterface. It does not have a SAM.");
    --- End diff --
    
    This message seems to be inexact: if there is no SAM, sam would be null 
upon returning from the method.
    I suggest changing the message and adding a check (for null sam) prior to 
returning.


> Wrongly extracted TypeInformations for WindowedStream::aggregate
> ----------------------------------------------------------------
>
>                 Key: FLINK-6783
>                 URL: https://issues.apache.org/jira/browse/FLINK-6783
>             Project: Flink
>          Issue Type: Bug
>          Components: Core, DataStream API
>    Affects Versions: 1.3.0, 1.3.1
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Blocker
>              Labels: flink-rel-1.3.1-blockers
>             Fix For: 1.3.1, 1.4.0
>
>
> The following test fails because of wrongly acquired output type for 
> {{AggregateFunction}}:
> {code}
> @Test
> public void testAggregateWithWindowFunctionDifferentResultTypes() throws 
> Exception {
>       StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       DataStream<Tuple2<String, Integer>> source = 
> env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
>       DataStream<Tuple3<String, String, Integer>> window = source
>               .keyBy(new TupleKeySelector())
>               .window(TumblingEventTimeWindows.of(Time.of(1, 
> TimeUnit.SECONDS)))
>               .aggregate(new AggregateFunction<Tuple2<String, Integer>, 
> Tuple2<String, Integer>, String>() {
>                       @Override
>                       public Tuple2<String, Integer> createAccumulator() {
>                               return Tuple2.of("", 0);
>                       }
>                       @Override
>                       public void add(
>                               Tuple2<String, Integer> value, Tuple2<String, 
> Integer> accumulator) {
>                       }
>                       @Override
>                       public String getResult(Tuple2<String, Integer> 
> accumulator) {
>                               return accumulator.f0;
>                       }
>                       @Override
>                       public Tuple2<String, Integer> merge(
>                               Tuple2<String, Integer> a, Tuple2<String, 
> Integer> b) {
>                               return Tuple2.of("", 0);
>                       }
>               }, new WindowFunction<String, Tuple3<String, String, Integer>, 
> String, TimeWindow>() {
>                       @Override
>                       public void apply(
>                               String s,
>                               TimeWindow window,
>                               Iterable<String> input,
>                               Collector<Tuple3<String, String, Integer>> out) 
> throws Exception {
>                               out.collect(Tuple3.of("", "", 0));
>                       }
>               });
>       OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, 
> Integer>> transform =
>               (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, 
> String, Integer>>) window.getTransformation();
>       OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, 
> Integer>> operator = transform.getOperator();
>       Assert.assertTrue(operator instanceof WindowOperator);
>       WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
>               (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) 
> operator;
>       Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
>       Assert.assertTrue(winOperator.getWindowAssigner() instanceof 
> TumblingEventTimeWindows);
>       Assert.assertTrue(winOperator.getStateDescriptor() instanceof 
> AggregatingStateDescriptor);
>       processElementAndEnsureOutput(
>               operator, winOperator.getKeySelector(), 
> BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
> }
> {code}
> The test results in 
> {code}
> org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: 
> Tuple type expected.
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1157)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:451)
>       at 
> org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:855)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowTranslationTest.testAggregateWithWindowFunctionDifferentResultTypes(WindowTranslationTest.java:702)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>       at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
>       at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>       at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Tuple 
> type expected.
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1204)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1154)
>       ... 25 more
> {code}
> I tracked down the issue and the reason is wrongly handled 
> {{outputTypeArgumentIndex}} in {{TypeExtractor::getUnaryOperatorReturnType}}.
> My proposition is to remove/deprecate version of 
> {{TypeExtractor::getUnaryOperatorReturnType}} that accepts {{hasIterable}} 
> and {{hasCollector}} as parameters and move all invocations to explicitly 
> passing index of output type (after fixing {{outputTypeArgumentIndex}} 
> handling in line {{TypeExtractor:455}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to