[ https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126305#comment-15126305 ]
ASF GitHub Bot commented on FLINK-3179: --------------------------------------- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1553#issuecomment-177997324 The `GroupReduceWithCombineProperties.instanciate()` method checks the shipping strategy of the input channel. In case of the `WordCount` example *without* explicit hash combiner, the shipping strategy is `PARTITION_HASH` and the `else` branch will inject a combiner. If you add an explicit partition operator, the input shipping strategy of the Reduce operator is `FORWARD` and the `if` branch is executed and does not add a combiner. Hence the logic has to into the `if` branch and not into the `else` branch. Or even better add an additional condition to the `if` case `!(in.getSource().getOptimizerNode() instanceof PartitionNode)` and add an `if else` branch to handle the special case of the explicit partition operator. > Combiner is not injected if Reduce or GroupReduce input is explicitly > partitioned > --------------------------------------------------------------------------------- > > Key: FLINK-3179 > URL: https://issues.apache.org/jira/browse/FLINK-3179 > Project: Flink > Issue Type: Bug > Components: Optimizer > Affects Versions: 0.10.1 > Reporter: Fabian Hueske > Assignee: ramkrishna.s.vasudevan > Priority: Critical > Fix For: 1.0.0, 0.10.2 > > > The optimizer does not inject a combiner if the input of a Reducer or > GroupReducer is explicitly partitioned as in the following example > {code} > DataSet<Tuple2<String,Integer>> words = ... > DataSet<Tuple2<String,Integer>> counts = words > .partitionByHash(0) > .groupBy(0) > .sum(1); > {code} > Explicit partitioning can be useful to enforce partitioning on a subset of > keys or to use a different partitioning method (custom or range partitioning). > This issue should be fixed by changing the {{instantiate()}} methods of the > {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such > that a combine is injected in front of a {{PartitionPlanNode}} if it is the > input of a Reduce or GroupReduce operator. This should only happen, if the > Reducer is the only successor of the Partition operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)