Repository: flink Updated Branches: refs/heads/master 9c8747d38 -> 76da44283
[FLINK-3179] [dataSet][optimizer] Log a WARN message if combiner is not added in front of PartitionOperator This closes #1822 This closes #1553 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76da4428 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76da4428 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76da4428 Branch: refs/heads/master Commit: 76da442837d8606d8c4c4d8367b29fd79dc5ba35 Parents: 22d327f Author: ramkrishna <[email protected]> Authored: Mon Mar 21 17:03:42 2016 +0530 Committer: Fabian Hueske <[email protected]> Committed: Tue Mar 22 13:53:58 2016 +0100 ---------------------------------------------------------------------- .../optimizer/operators/GroupReduceWithCombineProperties.java | 7 +++++++ .../apache/flink/optimizer/operators/ReduceProperties.java | 7 +++++++ 2 files changed, 14 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/76da4428/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java index a4f295d..888b670 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.optimizer.costs.Costs; import org.apache.flink.optimizer.dag.GroupReduceNode; +import org.apache.flink.optimizer.dag.PartitionNode; import org.apache.flink.optimizer.dag.SingleInputNode; import org.apache.flink.optimizer.dataproperties.GlobalProperties; import org.apache.flink.optimizer.dataproperties.LocalProperties; @@ -39,8 +40,11 @@ import org.apache.flink.runtime.io.network.DataExchangeMode; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class GroupReduceWithCombineProperties extends OperatorDescriptorSingle { + private static final Logger LOG = LoggerFactory.getLogger(GroupReduceWithCombineProperties.class); private final Ordering ordering; // ordering that we need to use if an additional ordering is requested @@ -90,6 +94,9 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + LOG.warn("Cannot automatically inject combiner for GroupReduceFunction. Please add an explicit combiner with combineGroup() in front of the partition operator."); + } // adjust a sort (changes grouping, so it must be for this driver to combining sort if (in.getLocalStrategy() == LocalStrategy.SORT) { if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { http://git-wip-us.apache.org/repos/asf/flink/blob/76da4428/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java index 81afe1e..78007fe 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.PartitionNode; import org.apache.flink.optimizer.dag.ReduceNode; import org.apache.flink.optimizer.dag.SingleInputNode; import org.apache.flink.optimizer.dataproperties.GlobalProperties; @@ -37,8 +38,11 @@ import org.apache.flink.runtime.io.network.DataExchangeMode; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class ReduceProperties extends OperatorDescriptorSingle { + private static final Logger LOG = LoggerFactory.getLogger(ReduceProperties.class); private final Partitioner<?> customPartitioner; @@ -61,6 +65,9 @@ public final class ReduceProperties extends OperatorDescriptorSingle { if (in.getShipStrategy() == ShipStrategyType.FORWARD || (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) { + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + LOG.warn("Cannot automatically inject combiner for ReduceFunction. Please add an explicit combiner with combineGroup() in front of the partition operator."); + } return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_REDUCE, this.keyList); }
