Repository: flink
Updated Branches:
  refs/heads/master 9c8747d38 -> 76da44283


[FLINK-3179] [dataSet][optimizer] Log a WARN message if combiner is not added 
in front of PartitionOperator

This closes #1822
This closes #1553


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76da4428
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76da4428
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76da4428

Branch: refs/heads/master
Commit: 76da442837d8606d8c4c4d8367b29fd79dc5ba35
Parents: 22d327f
Author: ramkrishna <[email protected]>
Authored: Mon Mar 21 17:03:42 2016 +0530
Committer: Fabian Hueske <[email protected]>
Committed: Tue Mar 22 13:53:58 2016 +0100

----------------------------------------------------------------------
 .../optimizer/operators/GroupReduceWithCombineProperties.java | 7 +++++++
 .../apache/flink/optimizer/operators/ReduceProperties.java    | 7 +++++++
 2 files changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76da4428/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
index a4f295d..888b670 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.optimizer.costs.Costs;
 import org.apache.flink.optimizer.dag.GroupReduceNode;
+import org.apache.flink.optimizer.dag.PartitionNode;
 import org.apache.flink.optimizer.dag.SingleInputNode;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
 import org.apache.flink.optimizer.dataproperties.LocalProperties;
@@ -39,8 +40,11 @@ import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class GroupReduceWithCombineProperties extends 
OperatorDescriptorSingle {
+       private static final Logger LOG = 
LoggerFactory.getLogger(GroupReduceWithCombineProperties.class);
        
        private final Ordering ordering;                // ordering that we 
need to use if an additional ordering is requested 
        
@@ -90,6 +94,9 @@ public final class GroupReduceWithCombineProperties extends 
OperatorDescriptorSi
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
                if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
+                       if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+                               LOG.warn("Cannot automatically inject combiner 
for GroupReduceFunction. Please add an explicit combiner with combineGroup() in 
front of the partition operator.");
+                       }
                        // adjust a sort (changes grouping, so it must be for 
this driver to combining sort
                        if (in.getLocalStrategy() == LocalStrategy.SORT) {
                                if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/76da4428/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
index 81afe1e..78007fe 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.PartitionNode;
 import org.apache.flink.optimizer.dag.ReduceNode;
 import org.apache.flink.optimizer.dag.SingleInputNode;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -37,8 +38,11 @@ import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class ReduceProperties extends OperatorDescriptorSingle {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ReduceProperties.class);
        
        private final Partitioner<?> customPartitioner;
        
@@ -61,6 +65,9 @@ public final class ReduceProperties extends 
OperatorDescriptorSingle {
                if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
                                (node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
                {
+                       if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+                               LOG.warn("Cannot automatically inject combiner 
for ReduceFunction. Please add an explicit combiner with combineGroup() in 
front of the partition operator.");
+                       }
                        return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")", in,
                                                                                
        DriverStrategy.SORTED_REDUCE, this.keyList);
                }

Reply via email to