Gabor Gevay created FLINK-4578:
----------------------------------
Summary: AggregateOperator incorrectly sets ForwardedField with
nested composite types
Key: FLINK-4578
URL: https://issues.apache.org/jira/browse/FLINK-4578
Project: Flink
Issue Type: Bug
Components: DataSet API
Reporter: Gabor Gevay
When an aggregation is called on a grouped DataSet,
{{AggregateOperator.translateToDataFlow}} tries to determine whether the field
that is being aggregated is the same field that the grouping is based on. If
this is not the case, then it adds the ForwardedField property for the key
field.
However, the mechanism that makes this decision breaks when there are nested
composite types involved, because it gets the key positions with
{{getKeys().computeLogicalKeyPositions()}}, which returns the _flat_ positions,
whereas the position of the field to aggregate is counted only on the outer
type.
Example code: https://github.com/ggevay/flink/tree/agg-bad-forwarded-fields
Here, I have changed the WordCount example to have the type
{{Tuple3<Tuple2<Byte,Byte>, String, Integer>}}, and do {{.groupBy(1).sum(2)}}
(which groups by the String field and sums the Integer field). If you set a
breakpoint into {{AggregateOperator.translateToDataFlow}}, you can see that
{{logicalKeyPositions}} contains 2, and {{fields}} also contains 2, which
causes {{keyFieldUsedInAgg}} to be erroneously set to true. The problem is
caused by the Tuple2 being counted as 2 fields in {{logicalKeyPositions}}, but
only 1 field in {{fields}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)