[
https://issues.apache.org/jira/browse/FLINK-6783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16034771#comment-16034771
]
ASF GitHub Bot commented on FLINK-6783:
---------------------------------------
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/4039#discussion_r119868212
--- Diff:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
---
@@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<?
extends TypeInfoFactory> fac
throw new InvalidTypesException("Internal error
occurred.", e);
}
if (exec != null) {
+ Preconditions.checkArgument(
+ lambdaInputTypeArgumentIndices != null
&& lambdaInputTypeArgumentIndices.length >= 1,
+ "Indices for input type arguments
within lambda not provided");
+ Preconditions.checkArgument(
+ lambdaOutputTypeArgumentIndices != null,
+ "Indices for output type arguments
within lambda not provided");
// check for lambda type erasure
validateLambdaGenericParameters(exec);
// parameters must be accessed from behind,
since JVM can add additional parameters e.g. when using local variables inside
lambda function
- final int paramLen =
exec.getParameterTypes().length - 1;
+ final int paramLen =
exec.getParameterTypes().length;
+
+ final Method sam =
getSingleAbstractMethod(baseClass);
+ final int baseParametersLen =
sam.getParameterTypes().length;
--- End diff --
Kind of. baseParametersLen is the number of arguments that the method of
interface has. So this is the "array" that the indices point to.
paramLen is the number of parameters of the lambda, which can have some
additional parameters e.g. from the closure. So we need both of those lengths
to index the argument correctly.
> Wrongly extracted TypeInformations for WindowedStream::aggregate
> ----------------------------------------------------------------
>
> Key: FLINK-6783
> URL: https://issues.apache.org/jira/browse/FLINK-6783
> Project: Flink
> Issue Type: Bug
> Components: Core, DataStream API
> Affects Versions: 1.3.0, 1.3.1
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
> Priority: Blocker
> Labels: flink-rel-1.3.1-blockers
> Fix For: 1.3.1
>
>
> The following test fails because of wrongly acquired output type for
> {{AggregateFunction}}:
> {code}
> @Test
> public void testAggregateWithWindowFunctionDifferentResultTypes() throws
> Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<Tuple2<String, Integer>> source =
> env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
> DataStream<Tuple3<String, String, Integer>> window = source
> .keyBy(new TupleKeySelector())
> .window(TumblingEventTimeWindows.of(Time.of(1,
> TimeUnit.SECONDS)))
> .aggregate(new AggregateFunction<Tuple2<String, Integer>,
> Tuple2<String, Integer>, String>() {
> @Override
> public Tuple2<String, Integer> createAccumulator() {
> return Tuple2.of("", 0);
> }
> @Override
> public void add(
> Tuple2<String, Integer> value, Tuple2<String,
> Integer> accumulator) {
> }
> @Override
> public String getResult(Tuple2<String, Integer>
> accumulator) {
> return accumulator.f0;
> }
> @Override
> public Tuple2<String, Integer> merge(
> Tuple2<String, Integer> a, Tuple2<String,
> Integer> b) {
> return Tuple2.of("", 0);
> }
> }, new WindowFunction<String, Tuple3<String, String, Integer>,
> String, TimeWindow>() {
> @Override
> public void apply(
> String s,
> TimeWindow window,
> Iterable<String> input,
> Collector<Tuple3<String, String, Integer>> out)
> throws Exception {
> out.collect(Tuple3.of("", "", 0));
> }
> });
> OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String,
> Integer>> transform =
> (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String,
> String, Integer>>) window.getTransformation();
> OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String,
> Integer>> operator = transform.getOperator();
> Assert.assertTrue(operator instanceof WindowOperator);
> WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
> (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>)
> operator;
> Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
> Assert.assertTrue(winOperator.getWindowAssigner() instanceof
> TumblingEventTimeWindows);
> Assert.assertTrue(winOperator.getStateDescriptor() instanceof
> AggregatingStateDescriptor);
> processElementAndEnsureOutput(
> operator, winOperator.getKeySelector(),
> BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
> }
> {code}
> The test results in
> {code}
> org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch:
> Tuple type expected.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1157)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:451)
> at
> org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:855)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowTranslationTest.testAggregateWithWindowFunctionDifferentResultTypes(WindowTranslationTest.java:702)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Tuple
> type expected.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1204)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1154)
> ... 25 more
> {code}
> I tracked down the issue and the reason is wrongly handled
> {{outputTypeArgumentIndex}} in {{TypeExtractor::getUnaryOperatorReturnType}}.
> My proposition is to remove/deprecate version of
> {{TypeExtractor::getUnaryOperatorReturnType}} that accepts {{hasIterable}}
> and {{hasCollector}} as parameters and move all invocations to explicitly
> passing index of output type (after fixing {{outputTypeArgumentIndex}}
> handling in line {{TypeExtractor:455}}.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)