[ 
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15183019#comment-15183019
 ] 

ASF GitHub Bot commented on FLINK-3179:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1553#discussion_r55205324
  
    --- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
    @@ -126,12 +117,66 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
                        }
                        toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),
                                                                                
in.getLocalStrategySortOrder());
    -
                        return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",
                                                                                
        toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
                }
        }
     
    +   private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, 
SingleInputNode node) {
    +           // Inject a combiner before the partition node
    +           Channel channelWithPartitionSrc = new Channel(in.getSource());
    +           GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
    +           combinerNode.setParallelism(in.getSource().getParallelism());
    +           if(in.getSource().getInputs().iterator().hasNext()) {
    +                   Channel oldChannelToPartitioner = 
channelWithPartitionSrc.getSource().getInputs().iterator().next();
    +                   Channel toCombiner = new 
Channel(oldChannelToPartitioner.getSource());
    +            // A combiner plan node is created with the channel as input 
that has the partitioner as the target
    +                   toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
    +            SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
    +                .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
    +            setCombinerProperties(in, oldChannelToPartitioner, combiner);
    +
    +                   Channel toPartitioner = new Channel(combiner);
    +                   // Set the actual partitioner node's strategy key and 
strategy order
    +                   
toPartitioner.setShipStrategy(oldChannelToPartitioner.getShipStrategy(), 
oldChannelToPartitioner.getShipStrategyKeys(),
    +                           
oldChannelToPartitioner.getShipStrategySortOrder(), 
oldChannelToPartitioner.getDataExchangeMode());
    +            // Create the partition single input plan node from the 
existing partition node
    +            PlanNode partitionplanNode = in.getSource().getPlanNode();
    +            SingleInputPlanNode partition = new 
SingleInputPlanNode(in.getSource().getOptimizerNode(), 
partitionplanNode.getNodeName(),
    +                toPartitioner, partitionplanNode.getDriverStrategy());
    +            partition.setCosts(partitionplanNode.getNodeCosts());
    +            
partition.initProperties(partitionplanNode.getGlobalProperties(), 
partitionplanNode.getLocalProperties());
    +            // Create a reducer such that the input of the reducer is the 
partition node
    +            Channel toReducer = new Channel(partition);
    +            toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
    +                in.getShipStrategySortOrder(), in.getDataExchangeMode());
    +            return getReducerSingleInputPlanNode(toReducer, node);
    +        } else {
    +            return getReducerSingleInputPlanNode(in, node);
    --- End diff --
    
    in.getSource is the partitioner an should thus have exactly one input. If 
this is not the case, something is wrong and we should throw a 
`CompilerException`.


> Combiner is not injected if Reduce or GroupReduce input is explicitly 
> partitioned
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-3179
>                 URL: https://issues.apache.org/jira/browse/FLINK-3179
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.10.1
>            Reporter: Fabian Hueske
>            Assignee: ramkrishna.s.vasudevan
>            Priority: Critical
>             Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or 
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet<Tuple2<String,Integer>> words = ...
> DataSet<Tuple2<String,Integer>> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of 
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the 
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such 
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the 
> input of a Reduce or GroupReduce operator. This should only happen, if the 
> Reducer is the only successor of the Partition operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to