[FLINK-1328] Integrated forwarded Fields into the optimizer (incomplete) This closes #83
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78f41e9c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78f41e9c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78f41e9c Branch: refs/heads/master Commit: 78f41e9cae597deeafdec934d6052ad72db6946f Parents: 82c4200 Author: sebastian kunert <[email protected]> Authored: Sun Aug 31 19:22:56 2014 +0200 Committer: Fabian Hueske <[email protected]> Committed: Wed Jan 28 01:39:00 2015 +0100 ---------------------------------------------------------------------- .../dag/AbstractPartialSolutionNode.java | 5 +- .../flink/compiler/dag/BinaryUnionNode.java | 8 +- .../flink/compiler/dag/BulkIterationNode.java | 7 +- .../apache/flink/compiler/dag/DataSinkNode.java | 8 +- .../flink/compiler/dag/DataSourceNode.java | 5 +- .../apache/flink/compiler/dag/FilterNode.java | 12 +- .../flink/compiler/dag/OptimizerNode.java | 21 +- .../flink/compiler/dag/SingleInputNode.java | 27 +- .../apache/flink/compiler/dag/TwoInputNode.java | 44 +- .../flink/compiler/dag/UnaryOperatorNode.java | 14 +- .../compiler/dag/WorksetIterationNode.java | 9 +- .../dataproperties/GlobalProperties.java | 98 +- .../dataproperties/InterestingProperties.java | 17 +- .../dataproperties/LocalProperties.java | 80 +- .../RequestedGlobalProperties.java | 58 +- .../RequestedLocalProperties.java | 40 +- .../postpass/GenericFlatTypePostPass.java | 5 +- .../apache/flink/compiler/DOPChangeTest.java | 7 +- .../compiler/SemanticPropOptimizerTest.java | 941 +++++++++++++++++++ .../operators/DualInputSemanticProperties.java | 83 +- .../flink/api/common/operators/Ordering.java | 2 +- .../common/operators/SemanticProperties.java | 17 +- .../SingleInputSemanticProperties.java | 47 +- .../java/graph/ConnectedComponents.java | 6 +- .../record/functions/FunctionAnnotation.java | 108 ++- 25 files changed, 1460 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java index 81199a7..c54076d 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.costs.CostEstimator; import org.apache.flink.compiler.plan.PlanNode; @@ -86,8 +87,8 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode { } @Override - public boolean isFieldConstant(int input, int fieldNumber) { - return false; + public SemanticProperties getSemanticProperties() { + return null; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java index 9003c92..5d805a9 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java @@ -24,6 +24,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.Union; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.DataStatistics; @@ -252,8 +254,10 @@ public class BinaryUnionNode extends TwoInputNode { protected void readStubAnnotations() {} @Override - public boolean isFieldConstant(int input, int fieldNumber) { - return true; + public SemanticProperties getSemanticProperties() { + DualInputSemanticProperties sprops = new DualInputSemanticProperties(); + sprops.setAllFieldsConstant(true); + return sprops; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java index d500925..18ac4c2 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.base.BulkIterationBase; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.compiler.CompilerException; @@ -182,10 +183,10 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode public String getName() { return "Bulk Iteration"; } - + @Override - public boolean isFieldConstant(int input, int fieldNumber) { - return false; + public SemanticProperties getSemanticProperties() { + return null; } protected void readStubAnnotations() {} http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java index 28a4d7c..2d4fb30 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.operators.GenericDataSinkBase; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.costs.CostEstimator; @@ -230,9 +231,10 @@ public class DataSinkNode extends OptimizerNode { // -------------------------------------------------------------------------------------------- // Function Annotation Handling // -------------------------------------------------------------------------------------------- - - public boolean isFieldConstant(int input, int fieldNumber) { - return false; + + @Override + public SemanticProperties getSemanticProperties() { + return null; } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java index 39c09a1..752a763 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.io.NonParallelInput; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.PactCompiler; import org.apache.flink.compiler.costs.CostEstimator; @@ -193,8 +194,8 @@ public class DataSourceNode extends OptimizerNode { } @Override - public boolean isFieldConstant(int input, int fieldNumber) { - return false; + public SemanticProperties getSemanticProperties() { + return null; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java index df304b1..140734c 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java @@ -21,6 +21,8 @@ package org.apache.flink.compiler.dag; import java.util.Collections; import java.util.List; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.base.FilterOperatorBase; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.operators.FilterDescriptor; @@ -47,10 +49,14 @@ public class FilterNode extends SingleInputNode { public String getName() { return "Filter"; } - + @Override - public boolean isFieldConstant(int input, int fieldNumber) { - return true; + public SemanticProperties getSemanticProperties() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + sprops.setAllFieldsConstant(true); + + return sprops; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java index 89cc354..75becf8 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.flink.api.common.operators.AbstractUdfOperator; import org.apache.flink.api.common.operators.CompilerHints; import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.DataStatistics; @@ -268,16 +269,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat */ @Override public abstract void accept(Visitor<OptimizerNode> visitor); - - /** - * Checks whether a field is modified by the user code or whether it is kept unchanged. - * - * @param input The input number. - * @param fieldNumber The position of the field. - * - * @return True if the field is not changed by the user function, false otherwise. - */ - public abstract boolean isFieldConstant(int input, int fieldNumber); + + public abstract SemanticProperties getSemanticProperties(); // ------------------------------------------------------------------------ // Getters / Setters @@ -688,8 +681,12 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat return null; } for (int keyColumn : keyColumns) { - if (!isFieldConstant(input, keyColumn)) { - return null; + FieldSet fs = getSemanticProperties() == null ? null : getSemanticProperties().getForwardFields(input, keyColumn); + + if (fs == null) { + return null; + } else if (!fs.contains(keyColumn)) { + return null; } } return keyColumns; http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java index 730c1bb..e7e82fc 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java @@ -31,8 +31,8 @@ import java.util.Map; import java.util.Set; import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.SingleInputOperator; -import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.PactCompiler; @@ -144,22 +144,8 @@ public abstract class SingleInputNode extends OptimizerNode { @Override - public boolean isFieldConstant(int input, int fieldNumber) { - if (input != 0) { - throw new IndexOutOfBoundsException(); - } - - SingleInputOperator<?, ?, ?> c = getPactContract(); - SingleInputSemanticProperties semanticProperties = c.getSemanticProperties(); - - if (semanticProperties != null) { - FieldSet fs; - if ((fs = semanticProperties.getForwardedField(fieldNumber)) != null) { - return fs.contains(fieldNumber); - } - } - - return false; + public SemanticProperties getSemanticProperties() { + return ((SingleInputOperator<?,?,?>) getPactContract()).getSemanticProperties(); } @@ -444,10 +430,11 @@ public abstract class SingleInputNode extends OptimizerNode { LocalProperties lProps = in.getLocalProperties().clone(); gProps = dps.computeGlobalProperties(gProps); lProps = dps.computeLocalProperties(lProps); - + + SemanticProperties props = this.getSemanticProperties(); // filter by the user code field copies - gProps = gProps.filterByNodesConstantSet(this, 0); - lProps = lProps.filterByNodesConstantSet(this, 0); + gProps = gProps.filterBySemanticProperties(props, 0); + lProps = lProps.filterBySemanticProperties(props, 0); // apply node.initProperties(gProps, lProps); http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java index 32f0519..5e9a980 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java @@ -31,10 +31,9 @@ import java.util.Map; import java.util.Set; import org.apache.flink.api.common.operators.DualInputOperator; -import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.PactCompiler; import org.apache.flink.compiler.costs.CostEstimator; @@ -554,13 +553,14 @@ public abstract class TwoInputNode extends OptimizerNode { DualInputPlanNode node = operator.instantiate(in1, in2, this); node.setBroadcastInputs(broadcastChannelsCombination); - - GlobalProperties gp1 = in1.getGlobalProperties().clone().filterByNodesConstantSet(this, 0); - GlobalProperties gp2 = in2.getGlobalProperties().clone().filterByNodesConstantSet(this, 1); + + SemanticProperties props = this.getSemanticProperties(); + GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0); + GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1); GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2); - LocalProperties lp1 = in1.getLocalProperties().clone().filterByNodesConstantSet(this, 0); - LocalProperties lp2 = in2.getLocalProperties().clone().filterByNodesConstantSet(this, 1); + LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0); + LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1); LocalProperties locals = operator.computeLocalProperties(lp1, lp2); node.initProperties(combined, locals); @@ -690,37 +690,11 @@ public abstract class TwoInputNode extends OptimizerNode { } } - @Override - public boolean isFieldConstant(int input, int fieldNumber) { - DualInputOperator<?, ?, ?, ?> c = getPactContract(); - DualInputSemanticProperties semanticProperties = c.getSemanticProperties(); - - switch(input) { - case 0: - if (semanticProperties != null) { - FieldSet fs; - if ((fs = semanticProperties.getForwardedField1(fieldNumber)) != null) { - return fs.contains(fieldNumber); - } - } - break; - case 1: - if(semanticProperties != null) { - FieldSet fs; - if ((fs = semanticProperties.getForwardedField2(fieldNumber)) != null) { - return fs.contains(fieldNumber); - } - } - break; - default: - throw new IndexOutOfBoundsException(); - } - - return false; + public SemanticProperties getSemanticProperties() { + return ((DualInputOperator<?, ?, ?, ?>) getPactContract()).getSemanticProperties(); } - // -------------------------------------------------------------------------------------------- // Miscellaneous // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java index 432f904..aaf0a10 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java @@ -21,6 +21,8 @@ package org.apache.flink.compiler.dag; import java.util.Arrays; import java.util.List; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.operators.OperatorDescriptorSingle; @@ -54,12 +56,12 @@ public class UnaryOperatorNode extends SingleInputNode { public String getName() { return this.name; } - - public boolean isFieldConstant(int input, int fieldNumber) { - if (input != 0) { - throw new IndexOutOfBoundsException(); - } - return true; + + @Override + public SemanticProperties getSemanticProperties() { + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + sprops.setAllFieldsConstant(true); + return sprops; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java index ae6bc6b..95fc066 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.base.DeltaIterationBase; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.typeinfo.NothingTypeInfo; @@ -218,12 +219,12 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode public String getName() { return "Workset Iteration"; } - + @Override - public boolean isFieldConstant(int input, int fieldNumber) { - return false; + public SemanticProperties getSemanticProperties() { + return null; } - + protected void readStubAnnotations() {} @Override http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java index 7dedc53..4fe632a 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java @@ -25,10 +25,10 @@ import java.util.Set; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.CompilerException; -import org.apache.flink.compiler.dag.OptimizerNode; import org.apache.flink.compiler.plan.Channel; import org.apache.flink.compiler.util.Utils; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; @@ -212,7 +212,19 @@ public class GlobalProperties implements Cloneable { return false; } } - + + public Ordering getOrdering() { + return this.ordering; + } + + public void setOrdering(Ordering ordering) { + this.ordering = ordering; + } + + public void setPartitioningFields(FieldList partitioningFields) { + this.partitioningFields = partitioningFields; + } + public boolean isFullyReplicated() { return this.partitioning == PartitioningProperty.FULL_REPLICATION; } @@ -234,56 +246,86 @@ public class GlobalProperties implements Cloneable { } /** - * Filters these properties by what can be preserved through the given output contract. - * - * @param node The optimizer node. - * @param input The input of the node to filter against. - * @return The adjusted global properties. + * Filters these GlobalProperties by the fields that are constant or forwarded to another output field. + * + * @param props The node representing the contract. + * @param input The index of the input. + * @return The filtered GlobalProperties */ - public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { // check if partitioning survives + FieldList forwardFields = null; + GlobalProperties returnProps = this; + + if (props == null) { + return new GlobalProperties(); + } + if (this.ordering != null) { - for (int col : this.ordering.getInvolvedIndexes()) { - if (!node.isFieldConstant(input, col)) { - return new GlobalProperties(); + Ordering no = new Ordering(); + for (int index : this.ordering.getInvolvedIndexes()) { + forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList(); + if (forwardFields == null) { + returnProps = new GlobalProperties(); + no = null; + break; + } else { + returnProps = returnProps == this ? this.clone() : returnProps; + for (int i = 0; i < forwardFields.size(); i++) { + no.appendOrdering(forwardFields.get(i), this.ordering.getType(index), this.ordering.getOrder(index)); + } } + returnProps.setOrdering(no); } } if (this.partitioningFields != null) { - for (int colIndex : this.partitioningFields) { - if (!node.isFieldConstant(input, colIndex)) { - return new GlobalProperties(); + returnProps = returnProps == this ? this.clone() : returnProps; + returnProps.setPartitioningFields(new FieldList()); + + for (int index : this.partitioningFields) { + forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList(); + if (forwardFields == null) { + returnProps = new GlobalProperties(); + break; + } else { + returnProps.setPartitioningFields(returnProps.getPartitioningFields().addFields(forwardFields)); } } } if (this.uniqueFieldCombinations != null) { HashSet<FieldSet> newSet = new HashSet<FieldSet>(); newSet.addAll(this.uniqueFieldCombinations); - - for (Iterator<FieldSet> combos = newSet.iterator(); combos.hasNext(); ){ + for (Iterator<FieldSet> combos = this.uniqueFieldCombinations.iterator(); combos.hasNext(); ){ FieldSet current = combos.next(); + FieldSet nfs = new FieldSet(); for (Integer field : current) { - if (!node.isFieldConstant(input, field)) { - combos.remove(); + if (props.getForwardFields(input, field) == null) { + newSet.remove(current); + nfs = null; break; + } else { + nfs = nfs.addFields(props.getForwardFields(input, field)); } } + if (nfs != null) { + newSet.remove(current); + newSet.add(nfs); + } } - - if (newSet.size() != this.uniqueFieldCombinations.size()) { - GlobalProperties gp = clone(); - gp.uniqueFieldCombinations = newSet.isEmpty() ? null : newSet; - return gp; - } + + GlobalProperties gp = returnProps.clone(); + gp.uniqueFieldCombinations = newSet.isEmpty() ? null : newSet; + return gp; } - + if (this.partitioning == PartitioningProperty.FULL_REPLICATION) { return new GlobalProperties(); } - - return this; + + return returnProps; } - + + public void parameterizeChannel(Channel channel, boolean globalDopChange) { switch (this.partitioning) { case RANDOM: http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java index 9686484..0f60576 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java @@ -23,7 +23,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.compiler.dag.OptimizerNode; +import org.apache.flink.compiler.dag.SingleInputNode; +import org.apache.flink.compiler.dag.TwoInputNode; /** * The interesting properties that a node in the optimizer plan hands to its predecessors. It has the @@ -87,19 +90,25 @@ public class InterestingProperties implements Cloneable public Set<RequestedGlobalProperties> getGlobalProperties() { return this.globalProps; } - + public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) { InterestingProperties iProps = new InterestingProperties(); - + SemanticProperties props = null; + if (node instanceof SingleInputNode) { + props = node.getSemanticProperties(); + } else if (node instanceof TwoInputNode) { + props = node.getSemanticProperties(); + } + for (RequestedGlobalProperties rgp : this.globalProps) { - RequestedGlobalProperties filtered = rgp.filterByNodesConstantSet(node, input); + RequestedGlobalProperties filtered = rgp.filterBySemanticProperties(props, input); if (filtered != null && !filtered.isTrivial()) { iProps.addGlobalProperties(filtered); } } for (RequestedLocalProperties rlp : this.localProps) { - RequestedLocalProperties filtered = rlp.filterByNodesConstantSet(node, input); + RequestedLocalProperties filtered = rlp.filterBySemanticProperties(props, input); if (filtered != null && !filtered.isTrivial()) { iProps.addLocalProperties(filtered); } http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java index 3c66e56..b06774c 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java @@ -20,12 +20,13 @@ package org.apache.flink.compiler.dataproperties; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.compiler.dag.OptimizerNode; /** * This class represents local properties of the data. A local property is a property that exists @@ -126,57 +127,91 @@ public class LocalProperties implements Cloneable { /** * Filters these properties by what can be preserved through a user function's constant fields set. - * - * @param node The optimizer node that potentially modifies the properties. + * + * @param props The optimizer node that potentially modifies the properties. * @param input The input of the node which is relevant. - * - * @return True, if the resulting properties are non trivial. + * + * @return The filtered LocalProperties */ - public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public LocalProperties filterBySemanticProperties(SemanticProperties props, int input) { // check, whether the local order is preserved Ordering no = this.ordering; FieldList ngf = this.groupedFields; Set<FieldSet> nuf = this.uniqueFields; - + FieldList forwardList = null; + + if (props == null) { + return new LocalProperties(); + } + if (this.ordering != null) { + no = new Ordering(); final FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); for (int i = 0; i < involvedIndexes.size(); i++) { - if (!node.isFieldConstant(input, involvedIndexes.get(i))) { - if (i == 0) { + forwardList = props.getForwardFields(input, involvedIndexes.get(i)) == null ? null : props.getForwardFields(input, involvedIndexes.get(i)).toFieldList(); + + if (forwardList == null) { + no = null; + ngf = null; + /*if (i == 0) { no = null; ngf = null; } else { no = this.ordering.createNewOrderingUpToIndex(i); ngf = no.getInvolvedIndexes(); - } + }*/ break; + } else { + no.appendOrdering(forwardList.get(0), this.ordering.getType(i), this.ordering.getOrder(i)); + ngf = no.getInvolvedIndexes(); } } } else if (this.groupedFields != null) { // check, whether the local key grouping is preserved for (Integer index : this.groupedFields) { - if (!node.isFieldConstant(input, index)) { + forwardList = props.getForwardFields(input, index) == null ? null : props.getForwardFields(input, index).toFieldList(); + if (forwardList == null) { ngf = null; + break; + } else if (!forwardList.contains(index)) { + FieldList grouped = new FieldList(); + for (Integer value : ngf.toFieldList()) { + if (value.intValue() == index) { + grouped = grouped.addFields(forwardList); + } else { + grouped = grouped.addField(value); + } + } + ngf = grouped; } } } - - if (this.uniqueFields != null && this.uniqueFields.size() > 0) { - Set<FieldSet> s = new HashSet<FieldSet>(this.uniqueFields); - for (FieldSet fields : this.uniqueFields) { - for (Integer index : fields) { - if (!node.isFieldConstant(input, index)) { - s.remove(fields); + + if (this.uniqueFields != null) { + HashSet<FieldSet> newSet = new HashSet<FieldSet>(); + newSet.addAll(this.uniqueFields); + for (Iterator<FieldSet> combos = this.uniqueFields.iterator(); combos.hasNext(); ){ + FieldSet current = combos.next(); + FieldSet nfs = new FieldSet(); + for (Integer field : current) { + if (props.getForwardFields(input, field) == null) { + newSet.remove(current); + nfs = null; break; + } else { + nfs = nfs.addFields(props.getForwardFields(input, field)); } } + if (nfs != null) { + newSet.remove(current); + newSet.add(nfs); + } } - if (s.size() != this.uniqueFields.size()) { - nuf = s; - } + + nuf = newSet.isEmpty() ? null : newSet; } - + if (no == this.ordering && ngf == this.groupedFields && nuf == this.uniqueFields) { return this; } else { @@ -187,7 +222,6 @@ public class LocalProperties implements Cloneable { return lp; } } - // -------------------------------------------------------------------------------------------- @Override http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java index 4e9d60a..1320f82 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java @@ -21,9 +21,10 @@ package org.apache.flink.compiler.dataproperties; import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.CompilerException; -import org.apache.flink.compiler.dag.OptimizerNode; import org.apache.flink.compiler.plan.Channel; import org.apache.flink.compiler.util.Utils; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; @@ -188,30 +189,63 @@ public final class RequestedGlobalProperties implements Cloneable { this.customPartitioner = null; } + public void setPartitioningFields(FieldSet partitioned) { + this.partitioningFields = partitioned; + } + + public void setPartitioningFields(FieldSet fields, PartitioningProperty partitioning) { + this.partitioningFields = fields; + this.partitioning = partitioning; + } + + public void setOrdering(Ordering newOrdering) { + this.ordering = newOrdering; + } + /** * Filters these properties by what can be preserved by the given node when propagated down * to the given input. - * - * @param node The node representing the contract. + * + * @param props The node representing the contract. * @param input The index of the input. - * @return True, if any non-default value is preserved, false otherwise. + * @return The filtered RequestedGlobalProperties */ - public RequestedGlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public RequestedGlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { + FieldList sourceList; + RequestedGlobalProperties returnProps = null; + + if (props == null) { + return null; + } + // check if partitioning survives if (this.ordering != null) { - for (int col : this.ordering.getInvolvedIndexes()) { - if (!node.isFieldConstant(input, col)) { + Ordering no = new Ordering(); + returnProps = new RequestedGlobalProperties(); + returnProps.setPartitioningFields(new FieldSet(), this.partitioning); + + for (int index = 0; index < this.ordering.getInvolvedIndexes().size(); index++) { + int value = this.ordering.getInvolvedIndexes().get(index); + sourceList = props.getSourceField(input, value) == null ? null : props.getSourceField(input, value).toFieldList(); + if (sourceList != null) { + no.appendOrdering(sourceList.get(0), this.ordering.getType(index), this.ordering.getOrder(index)); + } else { return null; } } + returnProps.setOrdering(no); } else if (this.partitioningFields != null) { - for (int colIndex : this.partitioningFields) { - if (!node.isFieldConstant(input, colIndex)) { + returnProps = new RequestedGlobalProperties(); + returnProps.setPartitioningFields(new FieldSet(), this.partitioning); + for (Integer index : this.partitioningFields) { + sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); + if (sourceList != null) { + returnProps.setPartitioningFields(returnProps.getPartitionedFields().addFields(sourceList), this.partitioning); + } else { return null; } } } - // make sure that certain properties are not pushed down final PartitioningProperty partitioning = this.partitioning; if (partitioning == PartitioningProperty.FULL_REPLICATION || @@ -220,8 +254,8 @@ public final class RequestedGlobalProperties implements Cloneable { { return null; } - - return this; + + return returnProps; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java index dc897bd..a5bf118 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java @@ -22,9 +22,9 @@ package org.apache.flink.compiler.dataproperties; import java.util.Arrays; import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.compiler.dag.OptimizerNode; import org.apache.flink.compiler.plan.Channel; import org.apache.flink.compiler.util.Utils; import org.apache.flink.runtime.operators.util.LocalStrategy; @@ -138,29 +138,47 @@ public class RequestedLocalProperties implements Cloneable { * Filters these properties by what can be preserved through a user function's constant fields set. * Since interesting properties are filtered top-down, anything that partially destroys the ordering * makes the properties uninteresting. - * - * @param node The optimizer node that potentially modifies the properties. + * + * @param props The optimizer node that potentially modifies the properties. * @param input The input of the node which is relevant. - * - * @return True, if the resulting properties are non trivial. + * + * @return The filtered RequestedLocalProperties */ - public RequestedLocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public RequestedLocalProperties filterBySemanticProperties(SemanticProperties props, int input) { + FieldList sourceList; + RequestedLocalProperties returnProps = this; + + if (props == null) { + return null; + } + if (this.ordering != null) { - final FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); - for (int i = 0; i < involvedIndexes.size(); i++) { - if (!node.isFieldConstant(input, involvedIndexes.get(i))) { + Ordering no = new Ordering(); + returnProps = this.clone(); + for (int index = 0; index < this.ordering.getInvolvedIndexes().size(); index++) { + int value = this.ordering.getInvolvedIndexes().get(index); + sourceList = props.getSourceField(input, value) == null ? null : props.getSourceField(input, value).toFieldList(); + if (sourceList != null) { + no.appendOrdering(sourceList.get(0), this.ordering.getType(index), this.ordering.getOrder(index)); + } else { return null; } } + returnProps.setOrdering(no); } else if (this.groupedFields != null) { + returnProps = this.clone(); + returnProps.setGroupedFields(new FieldList()); // check, whether the local key grouping is preserved for (Integer index : this.groupedFields) { - if (!node.isFieldConstant(input, index)) { + sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); + if (sourceList != null) { + returnProps.setGroupedFields(returnProps.getGroupedFields().addFields(sourceList)); + } else { return null; } } } - return this; + return returnProps; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java index 717a0c2..ab083ef 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java @@ -21,6 +21,7 @@ package org.apache.flink.compiler.postpass; import java.util.Map; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; @@ -522,7 +523,9 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im try { for (Map.Entry<Integer, X> entry : sourceSchema) { Integer pos = entry.getKey(); - if (optNode.isFieldConstant(input, pos)) { + SemanticProperties sprops = optNode.getSemanticProperties(); + + if (sprops != null && sprops.getForwardFields(input, pos) != null && sprops.getForwardFields(input,pos).contains(pos)) { targetSchema.addType(pos, entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java index 8b8828e..93a1fc5 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java @@ -257,8 +257,11 @@ public class DOPChangeTest extends CompilerTestBase { // mapper respectively reducer SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); - - Assert.assertEquals("The Reduce 2 Node has an invalid local strategy.", LocalStrategy.SORT, red2Node.getInput().getLocalStrategy()); + + SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); + + Assert.assertEquals("The Reduce 2 Node has an invalid local strategy.", LocalStrategy.NONE, red2Node.getInput().getLocalStrategy()); + Assert.assertEquals("The Map 2 Node has an invalid local strategy.", LocalStrategy.SORT, map2Node.getInput().getLocalStrategy()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java new file mode 100644 index 0000000..49e5d85 --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java @@ -0,0 +1,941 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler; + +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.typeutils.BasicTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.compiler.dataproperties.GlobalProperties; +import org.apache.flink.compiler.dataproperties.LocalProperties; +import org.apache.flink.compiler.dataproperties.PartitioningProperty; +import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties; +import org.apache.flink.compiler.dataproperties.RequestedLocalProperties; +import org.apache.flink.compiler.plan.Channel; +import org.apache.flink.compiler.plan.DualInputPlanNode; +import org.apache.flink.compiler.plan.OptimizedPlan; +import org.apache.flink.compiler.plan.PlanNode; +import org.apache.flink.compiler.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Visitor; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Set; + +public class SemanticPropOptimizerTest extends CompilerTestBase { + + private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = + new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + public static class SimpleReducer implements ReduceFunction<Tuple3<Integer, Integer, Integer>> { + @Override + public Tuple3<Integer, Integer, Integer> reduce(Tuple3<Integer, Integer, Integer> value1, Tuple3<Integer, Integer, Integer> value2) throws Exception { + return null; + } + } + + public static class SimpleMap implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> { + @Override + public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception { + return null; + } + } + + @Test + public void forwardFieldsTestMapReduce() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + set = set.map(new SimpleMap()).withConstantSet("*") + .groupBy(0) + .reduce(new SimpleReducer()).withConstantSet("0->1") + .map(new SimpleMap()).withConstantSet("*") + .groupBy(1) + .reduce(new SimpleReducer()).withConstantSet("*"); + + set.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + oPlan.accept(new Visitor<PlanNode>() { + @Override + public boolean preVisit(PlanNode visitable) { + if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperatorBase) { + for (Channel input: visitable.getInputs()) { + GlobalProperties gprops = visitable.getGlobalProperties(); + LocalProperties lprops = visitable.getLocalProperties(); + + Assert.assertTrue("Reduce should just forward the input if it is already partitioned", + input.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Wrong GlobalProperties on Reducer", + gprops.isPartitionedOnFields(new FieldSet(1))); + Assert.assertTrue("Wrong GlobalProperties on Reducer", + gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue("Wrong LocalProperties on Reducer", + lprops.getGroupedFields().contains(1)); + } + } + if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof MapOperatorBase) { + for (Channel input: visitable.getInputs()) { + GlobalProperties gprops = visitable.getGlobalProperties(); + LocalProperties lprops = visitable.getLocalProperties(); + + Assert.assertTrue("Map should just forward the input if it is already partitioned", + input.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Wrong GlobalProperties on Mapper", + gprops.isPartitionedOnFields(new FieldSet(1))); + Assert.assertTrue("Wrong GlobalProperties on Mapper", + gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue("Wrong LocalProperties on Mapper", + lprops.getGroupedFields().contains(1)); + } + return false; + } + return true; + } + + @Override + public void postVisit(PlanNode visitable) { + + } + }); + } + + @Test + public void localPropertiesFilterNothingPreservedTest() { + FieldList grouping = new FieldList().addFields(0, 3, 5, 7); + + String[] constantSet = {"0->2", "3->1", "5->5"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + LocalProperties lprops = LocalProperties.forGrouping(grouping); + + LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result.getOrdering() == null); + Assert.assertTrue(result.getGroupedFields() == null); + Assert.assertTrue(result.getUniqueFields() == null); + } + + @Test + public void localPropertiesFilterWrongInputTest() { + FieldList grouping = new FieldList().addFields(0, 3, 5, 7); + + String[] constantSet = {"0->2", "3->1", "5->5"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + LocalProperties lprops = LocalProperties.forGrouping(grouping); + + try { + LocalProperties result = lprops.filterBySemanticProperties(sprops, 1); + } catch (Exception e) { + return; + } + Assert.fail(); + } + + @Test + public void localPropertiesFilterGroupingPreservedTest() { + FieldList grouping = new FieldList().addFields(0, 3, 5, 7); + + String[] constantSet = {"0->2", "3->1", "5->5", "7->0,4"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + LocalProperties lprops = LocalProperties.forGrouping(grouping); + + LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result.getOrdering() == null); + + Assert.assertTrue(result.getGroupedFields().size() == 5); + Assert.assertTrue(result.getGroupedFields().contains(2)); + Assert.assertTrue(result.getGroupedFields().contains(1)); + Assert.assertTrue(result.getGroupedFields().contains(5)); + Assert.assertTrue(result.getGroupedFields().contains(0)); + Assert.assertTrue(result.getGroupedFields().contains(4)); + + Assert.assertTrue(result.getUniqueFields() == null); + } + + @Test + public void localPropertiesFilterGroupingPreservedTest2() { + FieldList grouping = new FieldList().addFields(0, 3, 5, 7); + + String[] constantSet = {"0->0", "3->3", "5->5", "7->7"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + LocalProperties lprops = LocalProperties.forGrouping(grouping); + + LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result.getOrdering() == null); + + Assert.assertTrue(result.getGroupedFields().size() == 4); + Assert.assertTrue(result.getGroupedFields().contains(0)); + Assert.assertTrue(result.getGroupedFields().contains(3)); + Assert.assertTrue(result.getGroupedFields().contains(5)); + Assert.assertTrue(result.getGroupedFields().contains(7)); + + Assert.assertTrue(result.getUniqueFields() == null); + } + + @Test + public void localPropertiesFilterGroupingNothingPreservedTest() { + FieldList grouping = new FieldList().addFields(0, 3, 5, 7); + + String[] constantSet = {"0->2", "3->1", "7->0,4"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + LocalProperties lprops = LocalProperties.forGrouping(grouping); + + LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result.getGroupedFields() == null); + } + + @Test + public void localPropertiesFilterOrderingPreservedTest() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(0, null, Order.ASCENDING); + ordering.appendOrdering(1, null, Order.ASCENDING); + ordering.appendOrdering(2, null, Order.ASCENDING); + + String[] constantSet = {"0->2", "1->3", "2->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + LocalProperties lprops = LocalProperties.forOrdering(ordering); + + LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + FieldList involved = result.getOrdering().getInvolvedIndexes(); + Order[] orders = result.getOrdering().getFieldOrders(); + + + Assert.assertTrue(involved.size() == 3); + Assert.assertTrue(involved.contains(2)); + Assert.assertTrue(involved.contains(3)); + Assert.assertTrue(involved.contains(0)); + Assert.assertTrue(orders[0] == Order.ASCENDING); + Assert.assertTrue(orders[1] == Order.ASCENDING); + Assert.assertTrue(orders[2] == Order.ASCENDING); + } + + @Test + public void localPropertiesFilterOrderingNothingPreservedTest() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(0, null, Order.ASCENDING); + ordering.appendOrdering(1, null, Order.ASCENDING); + ordering.appendOrdering(2, null, Order.ASCENDING); + + String[] constantSet = {"0->2", "2->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + LocalProperties lprops = LocalProperties.forOrdering(ordering); + + LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result.getOrdering() == null); + } + + + + @Test + public void localPropertiesFilterUniqueFieldsPreservedTest() { + String[] constantSet = {"0->1", "1->2", "3->4", "2->3", "6->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + + FieldSet set1 = new FieldSet(0, 1, 2, 3); + FieldSet set2 = new FieldSet(4, 5, 6, 7); + FieldSet set3 = new FieldSet(0, 1, 6, 2); + LocalProperties lprops = new LocalProperties(); + lprops = lprops.addUniqueFields(set1).addUniqueFields(set2).addUniqueFields(set3); + + + LocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + Set<FieldSet> unique = result.getUniqueFields(); + FieldSet expected1 = new FieldSet(1, 2, 3, 4); + FieldSet expected2 = new FieldSet(0, 1, 2, 3); + + + Assert.assertTrue(unique.size() == 2); + Assert.assertTrue(unique.contains(expected1)); + Assert.assertTrue(unique.contains(expected2)); + } + + @Test + public void requestedLocalPropertiesFilterNothingPreservedTest() { + FieldList grouping = new FieldList().addFields(0, 3, 5, 7); + + String[] constantSet = {"0->2", "3->1", "5->5"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties lprops = new RequestedLocalProperties(); + lprops.setGroupedFields(grouping); + + RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result == null); + } + + @Test + public void requestedLocalPropertiesFilterWrongInputTest() { + FieldList grouping = new FieldList().addFields(0, 3, 5, 7); + + String[] constantSet = {"0->2", "3->1", "5->5"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties lprops = new RequestedLocalProperties(); + lprops.setGroupedFields(grouping); + + try { + RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 1); + } catch (Exception e) { + return; + } + Assert.fail(); + } + + @Test + public void requestedLocalPropertiesFilterGroupingPreservedTest() { + FieldList grouping = new FieldList().addFields(0, 3, 5, 7); + + String[] constantSet = {"6->7", "1->5", "4->3", "7->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties lprops = new RequestedLocalProperties(); + lprops.setGroupedFields(grouping); + + RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + + Assert.assertTrue(result.getGroupedFields().size() == 4); + Assert.assertTrue(result.getGroupedFields().contains(7)); + Assert.assertTrue(result.getGroupedFields().contains(4)); + Assert.assertTrue(result.getGroupedFields().contains(1)); + Assert.assertTrue(result.getGroupedFields().contains(6)); + } + + @Test + public void requestedLocalPropertiesFilterGroupingPreservedTest2() { + FieldList grouping = new FieldList().addFields(0, 3, 5, 7); + + String[] constantSet = {"0->0", "3->3", "5->5", "7->7"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties lprops = new RequestedLocalProperties(); + lprops.setGroupedFields(grouping); + + RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + + Assert.assertTrue(result.getGroupedFields().size() == 4); + Assert.assertTrue(result.getGroupedFields().contains(0)); + Assert.assertTrue(result.getGroupedFields().contains(3)); + Assert.assertTrue(result.getGroupedFields().contains(5)); + Assert.assertTrue(result.getGroupedFields().contains(7)); + } + + @Test + public void requestedLocalPropertiesFilterOrderingPreservedTest() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(2, null, Order.ASCENDING); + ordering.appendOrdering(3, null, Order.ASCENDING); + ordering.appendOrdering(0, null, Order.ASCENDING); + + String[] constantSet = {"6->2", "2->3", "5->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties lprops = new RequestedLocalProperties(); + lprops.setOrdering(ordering); + + RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + FieldList involved = result.getOrdering().getInvolvedIndexes(); + Order[] orders = result.getOrdering().getFieldOrders(); + + Assert.assertTrue(involved.size() == 3); + Assert.assertTrue(involved.contains(6)); + Assert.assertTrue(involved.contains(2)); + Assert.assertTrue(involved.contains(5)); + Assert.assertTrue(orders[0] == Order.ASCENDING); + Assert.assertTrue(orders[1] == Order.ASCENDING); + Assert.assertTrue(orders[2] == Order.ASCENDING); + } + + @Test + public void requestedLocalPropertiesFilterOrderingPreservedTest2() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(0, null, Order.ASCENDING); + ordering.appendOrdering(3, null, Order.ASCENDING); + ordering.appendOrdering(6, null, Order.ASCENDING); + + String[] constantSet = {"0->0", "3->3", "6->6"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties lprops = new RequestedLocalProperties(); + lprops.setOrdering(ordering); + + RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + FieldList involved = result.getOrdering().getInvolvedIndexes(); + Order[] orders = result.getOrdering().getFieldOrders(); + + Assert.assertTrue(involved.size() == 3); + Assert.assertTrue(involved.contains(0)); + Assert.assertTrue(involved.contains(3)); + Assert.assertTrue(involved.contains(6)); + Assert.assertTrue(orders[0] == Order.ASCENDING); + Assert.assertTrue(orders[1] == Order.ASCENDING); + Assert.assertTrue(orders[2] == Order.ASCENDING); + } + + @Test + public void requestedLocalPropertiesFilterOrderingNothingPreservedTest() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(2, null, Order.ASCENDING); + ordering.appendOrdering(3, null, Order.ASCENDING); + ordering.appendOrdering(0, null, Order.ASCENDING); + + String[] constantSet = {"6->2", "5->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties lprops = new RequestedLocalProperties(); + lprops.setOrdering(ordering); + + RequestedLocalProperties result = lprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result == null); + } + + @Test + public void requestedLocalPropertiesDualFilterOrderingPreservedTest() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(2, null, Order.ASCENDING); + ordering.appendOrdering(3, null, Order.ASCENDING); + ordering.appendOrdering(0, null, Order.ASCENDING); + + String[] constantSet = {"6->2", "2->3", "5->0"}; + DualInputSemanticProperties dprops = new DualInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsDualFromString(dprops, constantSet, constantSet, null, null, null, null, tupleInfo, tupleInfo, tupleInfo); + + RequestedLocalProperties lprops1 = new RequestedLocalProperties(); + lprops1.setOrdering(ordering); + + RequestedLocalProperties lprops2 = new RequestedLocalProperties(); + lprops2.setOrdering(ordering); + + RequestedLocalProperties result1 = lprops1.filterBySemanticProperties(dprops, 0); + RequestedLocalProperties result2 = lprops2.filterBySemanticProperties(dprops, 1); + + FieldSet involved1 = result1.getOrdering().getInvolvedIndexes(); + FieldSet involved2 = result2.getOrdering().getInvolvedIndexes(); + Order[] orders1 = result1.getOrdering().getFieldOrders(); + Order[] orders2 = result2.getOrdering().getFieldOrders(); + + + Assert.assertTrue(involved1.size() == 3); + Assert.assertTrue(involved1.contains(6)); + Assert.assertTrue(involved1.contains(2)); + Assert.assertTrue(involved1.contains(5)); + Assert.assertTrue(orders1[0] == Order.ASCENDING); + Assert.assertTrue(orders1[1] == Order.ASCENDING); + Assert.assertTrue(orders1[2] == Order.ASCENDING); + + Assert.assertTrue(involved2.size() == 3); + Assert.assertTrue(involved2.contains(6)); + Assert.assertTrue(involved2.contains(2)); + Assert.assertTrue(involved2.contains(5)); + Assert.assertTrue(orders2[0] == Order.ASCENDING); + Assert.assertTrue(orders2[1] == Order.ASCENDING); + Assert.assertTrue(orders2[2] == Order.ASCENDING); + } + + @Test + public void globalPropertiesFilterNothingPreservedTest() { + String[] constantSet = {"0->2", "3->1", "5->5"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result.getOrdering() == null); + Assert.assertTrue(result.getPartitioningFields() == null); + } + + @Test + public void globalPropertiesFilterWrongInputTest() { + String[] constantSet = {"0->2", "3->1", "5->5"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 3, 4, 6)); + + try { + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 1); + } catch (Exception e) { + return; + } + Assert.fail(); + } + + @Test + public void globalPropertiesFilterPartitioningPreservedTest() { + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 4)); + + String[] constantSet = {"0->2", "1->4", "4->7"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + FieldList list = result.getPartitioningFields(); + Assert.assertTrue(list.size() == 3); + Assert.assertTrue(list.contains(2)); + Assert.assertTrue(list.contains(4)); + Assert.assertTrue(list.contains(7)); + Assert.assertTrue(result.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + } + + @Test + public void globalPropertiesFilterPartitioningPreservedTest2() { + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 4)); + + String[] constantSet = {"0->2, 6", "1->4, 5", "4->7"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + FieldList list = result.getPartitioningFields(); + Assert.assertTrue(list.size() == 5); + Assert.assertTrue(list.contains(2)); + Assert.assertTrue(list.contains(4)); + Assert.assertTrue(list.contains(6)); + Assert.assertTrue(list.contains(5)); + Assert.assertTrue(list.contains(7)); + Assert.assertTrue(result.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + } + + @Test + public void globalPropertiesFilterPartitioningPreservedTest3() { + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 4)); + + String[] constantSet = {"0->0", "1->1", "4->4"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + FieldList list = result.getPartitioningFields(); + Assert.assertTrue(list.size() == 3); + Assert.assertTrue(list.contains(0)); + Assert.assertTrue(list.contains(1)); + Assert.assertTrue(list.contains(4)); + Assert.assertTrue(result.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + } + + @Test + public void globalPropertiesFilterOrderingPreservedTest() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(0, null, Order.ASCENDING); + ordering.appendOrdering(1, null, Order.ASCENDING); + ordering.appendOrdering(2, null, Order.ASCENDING); + + String[] constantSet = {"0->2", "1->3", "2->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setOrdering(ordering); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + FieldList involved = result.getOrdering().getInvolvedIndexes(); + Order[] orders = result.getOrdering().getFieldOrders(); + + + Assert.assertTrue(involved.size() == 3); + Assert.assertTrue(involved.contains(2)); + Assert.assertTrue(involved.contains(3)); + Assert.assertTrue(involved.contains(0)); + Assert.assertTrue(orders[0] == Order.ASCENDING); + Assert.assertTrue(orders[1] == Order.ASCENDING); + Assert.assertTrue(orders[2] == Order.ASCENDING); + } + + @Test + public void globalPropertiesFilterOrderingPreservedTest2() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(0, null, Order.ASCENDING); + ordering.appendOrdering(1, null, Order.ASCENDING); + ordering.appendOrdering(2, null, Order.ASCENDING); + + String[] constantSet = {"0->2,4", "1->3", "2->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setOrdering(ordering); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + FieldList involved = result.getOrdering().getInvolvedIndexes(); + Order[] orders = result.getOrdering().getFieldOrders(); + + + Assert.assertTrue(involved.size() == 4); + Assert.assertTrue(involved.contains(2)); + Assert.assertTrue(involved.contains(3)); + Assert.assertTrue(involved.contains(4)); + Assert.assertTrue(involved.contains(0)); + Assert.assertTrue(orders[0] == Order.ASCENDING); + Assert.assertTrue(orders[1] == Order.ASCENDING); + Assert.assertTrue(orders[2] == Order.ASCENDING); + Assert.assertTrue(orders[3] == Order.ASCENDING); + } + + @Test + public void globalPropertiesFilterOrderingPreservedTest3() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(0, null, Order.ASCENDING); + ordering.appendOrdering(1, null, Order.ASCENDING); + ordering.appendOrdering(2, null, Order.ASCENDING); + + String[] constantSet = {"0->0", "1->1", "2->2"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setOrdering(ordering); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + FieldList involved = result.getOrdering().getInvolvedIndexes(); + Order[] orders = result.getOrdering().getFieldOrders(); + + + Assert.assertTrue(involved.size() == 3); + Assert.assertTrue(involved.contains(0)); + Assert.assertTrue(involved.contains(1)); + Assert.assertTrue(involved.contains(2)); + Assert.assertTrue(orders[0] == Order.ASCENDING); + Assert.assertTrue(orders[1] == Order.ASCENDING); + Assert.assertTrue(orders[2] == Order.ASCENDING); + } + + @Test + public void globalPropertiesFilterOrderingNothingPreservedTest() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(0, null, Order.ASCENDING); + ordering.appendOrdering(1, null, Order.ASCENDING); + ordering.appendOrdering(2, null, Order.ASCENDING); + + String[] constantSet = {"0->2", "2->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setOrdering(ordering); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result.getOrdering() == null); + } + + @Test + public void globalPropertiesFilterUniqueFieldsPreservedTest() { + String[] constantSet = {"0->1", "1->2", "3->4", "2->3", "6->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + + FieldSet set1 = new FieldSet(0, 1, 2, 3); + FieldSet set2 = new FieldSet(4, 5, 6, 7); + FieldSet set3 = new FieldSet(0, 1, 6, 2); + GlobalProperties gprops = new GlobalProperties(); + gprops.addUniqueFieldCombination(set1); + gprops.addUniqueFieldCombination(set2); + gprops.addUniqueFieldCombination(set3); + + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + Set<FieldSet> unique = result.getUniqueFieldCombination(); + FieldSet expected1 = new FieldSet(1, 2, 3, 4); + FieldSet expected2 = new FieldSet(0, 1, 2, 3); + + + Assert.assertTrue(unique.size() == 2); + Assert.assertTrue(unique.contains(expected1)); + Assert.assertTrue(unique.contains(expected2)); + } + + @Test + public void requestedGlobalPropertiesFilterNothingPreservedTest() { + String[] constantSet = {"0->2", "3->1", "5->5"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties gprops = new RequestedGlobalProperties(); + RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result == null); + } + + @Test + public void requestedGlobalPropertiesDualFilterEverythingPreservedTest() { + String[] constantSet1 = {"0->1,2", "3->0", "2->4"}; + String[] constantSet2 = {"0->3", "4->6", "6->7"}; + + + DualInputSemanticProperties dprops = new DualInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsDualFromString(dprops, constantSet1, constantSet2, null, null, null, null, tupleInfo, tupleInfo, tupleInfo); + + RequestedGlobalProperties gprops1 = new RequestedGlobalProperties(); + RequestedGlobalProperties gprops2 = new RequestedGlobalProperties(); + + gprops1.setHashPartitioned(new FieldSet(2, 0, 4)); + gprops2.setHashPartitioned(new FieldSet(3, 6, 7)); + + gprops1 = gprops1.filterBySemanticProperties(dprops, 0); + gprops2 = gprops2.filterBySemanticProperties(dprops, 1); + + Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue(gprops1.getPartitionedFields().size() == 3); + Assert.assertTrue(gprops1.getPartitionedFields().contains(0)); + Assert.assertTrue(gprops1.getPartitionedFields().contains(3)); + Assert.assertTrue(gprops1.getPartitionedFields().contains(2)); + + Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue(gprops2.getPartitionedFields().size() == 3); + Assert.assertTrue(gprops2.getPartitionedFields().contains(0)); + Assert.assertTrue(gprops2.getPartitionedFields().contains(4)); + Assert.assertTrue(gprops2.getPartitionedFields().contains(6)); + } + + @Test + public void requestedGlobalPropertiesDualFilterNothingPreservedTest() { + String[] constantSet1 = {"0->1,2", "3->0", "2->4"}; + String[] constantSet2 = {"0->3", "4->6", "6->7"}; + + + DualInputSemanticProperties dprops = new DualInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsDualFromString(dprops, constantSet1, constantSet2, null, null, null, null, tupleInfo, tupleInfo, tupleInfo); + + RequestedGlobalProperties gprops1 = new RequestedGlobalProperties(); + RequestedGlobalProperties gprops2 = new RequestedGlobalProperties(); + + gprops1.setHashPartitioned(new FieldSet(6, 7)); + gprops2.setHashPartitioned(new FieldSet(3, 6, 7)); + + gprops1 = gprops1.filterBySemanticProperties(dprops, 0); + gprops2 = gprops2.filterBySemanticProperties(dprops, 1); + + Assert.assertTrue(gprops1 == null); + + Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue(gprops2.getPartitionedFields().size() == 3); + Assert.assertTrue(gprops2.getPartitionedFields().contains(0)); + Assert.assertTrue(gprops2.getPartitionedFields().contains(4)); + Assert.assertTrue(gprops2.getPartitionedFields().contains(6)); + } + + @Test + public void requestedGlobalPropertiesFilterWrongInputTest() { + String[] constantSet = {"0->2", "3->1", "5->5"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties gprops = new RequestedGlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 3, 4, 6)); + + try { + RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 1); + } catch (Exception e) { + return; + } + Assert.fail(); + } + + @Test + public void requestedGlobalPropertiesFilterPartitioningPreservedTest() { + RequestedGlobalProperties gprops = new RequestedGlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 4)); + + String[] constantSet = {"3->0", "5->1", "2->4"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + + RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + FieldSet set = result.getPartitionedFields(); + Assert.assertTrue(set.size() == 3); + Assert.assertTrue(set.contains(3)); + Assert.assertTrue(set.contains(5)); + Assert.assertTrue(set.contains(2)); + Assert.assertTrue(result.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + } + + @Test + public void requestedGlobalPropertiesFilterPartitioningPreservedTest2() { + RequestedGlobalProperties gprops = new RequestedGlobalProperties(); + gprops.setHashPartitioned(new FieldList(3, 5, 2)); + + String[] constantSet = {"3->3", "5->5", "2->2"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + + RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + FieldSet set = result.getPartitionedFields(); + Assert.assertTrue(set.size() == 3); + Assert.assertTrue(set.contains(3)); + Assert.assertTrue(set.contains(5)); + Assert.assertTrue(set.contains(2)); + Assert.assertTrue(result.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + } + + @Test + public void semanticPropertySourceFieldTest() { + SingleInputSemanticProperties props = new SingleInputSemanticProperties(); + props.addForwardedField(2, 4); + props.addForwardedField(3, 8); + props.addForwardedField(4, 10); + props.addForwardedField(4, 11); + + Assert.assertTrue(props.getSourceField(0, 4).size() == 1); + Assert.assertTrue(props.getSourceField(0, 4).contains(2)); + Assert.assertTrue(props.getSourceField(0, 8).size() == 1); + Assert.assertTrue(props.getSourceField(0, 8).contains(3)); + Assert.assertTrue(props.getSourceField(0, 10).size() == 1); + Assert.assertTrue(props.getSourceField(0, 10).contains(4)); + Assert.assertTrue(props.getSourceField(0, 11).size() == 1); + Assert.assertTrue(props.getSourceField(0, 11).contains(4)); + } + + @Test + public void requestedGlobalPropertiesFilterOrderingPreservedTest() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(2, null, Order.ASCENDING); + ordering.appendOrdering(3, null, Order.ASCENDING); + ordering.appendOrdering(0, null, Order.ASCENDING); + + String[] constantSet = {"6->2", "2->3", "5->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties gprops = new RequestedGlobalProperties(); + gprops.setOrdering(ordering); + + RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + FieldList involved = result.getOrdering().getInvolvedIndexes(); + Order[] orders = result.getOrdering().getFieldOrders(); + + + Assert.assertTrue(involved.size() == 3); + Assert.assertTrue(involved.contains(6)); + Assert.assertTrue(involved.contains(2)); + Assert.assertTrue(involved.contains(5)); + Assert.assertTrue(orders[0] == Order.ASCENDING); + Assert.assertTrue(orders[1] == Order.ASCENDING); + Assert.assertTrue(orders[2] == Order.ASCENDING); + } + + @Test + public void requestedGlobalPropertiesFilterOrderingNothingPreservedTest() { + Ordering ordering = new Ordering(); + ordering.appendOrdering(2, null, Order.ASCENDING); + ordering.appendOrdering(3, null, Order.ASCENDING); + ordering.appendOrdering(0, null, Order.ASCENDING); + + String[] constantSet = {"6->2", "5->0"}; + SingleInputSemanticProperties sprops = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties gprops = new RequestedGlobalProperties(); + gprops.setOrdering(ordering); + + RequestedGlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + Assert.assertTrue(result == null); + } + + @Test + public void forwardFieldsTestJoin() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> in1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> in2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + in1 = in1.map(new SimpleMap()).withConstantSet("*") + .groupBy(0) + .reduce(new SimpleReducer()).withConstantSet("0->1"); + in2 = in2.map(new SimpleMap()).withConstantSet("*") + .groupBy(1) + .reduce(new SimpleReducer()).withConstantSet("1->2"); + DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new JoinFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() { + @Override + public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception { + return null; + } + }); + + out.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + oPlan.accept(new Visitor<PlanNode>() { + @Override + public boolean preVisit(PlanNode visitable) { + if (visitable instanceof DualInputPlanNode && visitable.getPactContract() instanceof JoinOperatorBase) { + DualInputPlanNode node = ((DualInputPlanNode) visitable); + + final Channel inConn1 = node.getInput1(); + final Channel inConn2 = node.getInput2(); + + Assert.assertTrue("Join should just forward the input if it is already partitioned", + inConn1.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Join should just forward the input if it is already partitioned", + inConn2.getShipStrategy() == ShipStrategyType.FORWARD); + return false; + } + return true; + } + + @Override + public void postVisit(PlanNode visitable) { + + } + }); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java index e9aa358..fe53380 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java @@ -58,7 +58,40 @@ public class DualInputSemanticProperties extends SemanticProperties { public DualInputSemanticProperties() { init(); } - + + /** + * Finds the source field where the given field was forwarded from. + * @param dest The destination field in the output data. + * @return FieldSet containing the source input fields. + */ + public FieldSet forwardedFrom1(int dest) { + FieldSet fs = null; + for (Map.Entry<Integer, FieldSet> entry : forwardedFields1.entrySet()) { + if (entry.getValue().contains(dest)) { + if (fs == null) { + fs = new FieldSet(); + } + + fs = fs.addField(entry.getKey()); + } + } + return fs; + } + + public FieldSet forwardedFrom2(int dest) { + FieldSet fs = null; + for (Map.Entry<Integer, FieldSet> entry : forwardedFields2.entrySet()) { + if (entry.getValue().contains(dest)) { + if (fs == null) { + fs = new FieldSet(); + } + + fs = fs.addField(entry.getKey()); + } + } + return fs; + } + /** * Adds, to the existing information, a field that is forwarded directly * from the source record(s) in the first input to the destination @@ -115,6 +148,10 @@ public class DualInputSemanticProperties extends SemanticProperties { * @return the destination fields, or null if they do not exist */ public FieldSet getForwardedField1(int sourceField) { + if (isAllFieldsConstant()) { + return new FieldSet(sourceField); + } + return this.forwardedFields1.get(sourceField); } @@ -174,9 +211,43 @@ public class DualInputSemanticProperties extends SemanticProperties { * @return the destination fields, or null if they do not exist */ public FieldSet getForwardedField2(int sourceField) { + if (isAllFieldsConstant()) { + return new FieldSet(sourceField); + } + return this.forwardedFields2.get(sourceField); } - + + @Override + public FieldSet getSourceField(int input, int field) { + if (isAllFieldsConstant()) { + return new FieldSet(field); + } + + switch(input) { + case 0: + return this.forwardedFrom1(field); + case 1: + return this.forwardedFrom2(field); + default: + throw new IndexOutOfBoundsException(); + } + } + + @Override + public FieldSet getForwardFields(int input, int field) { + if (isAllFieldsConstant()) { + return new FieldSet(field); + } + + if (input == 0) { + return this.getForwardedField1(field); + } else if (input == 1) { + return this.getForwardedField2(field); + } + return null; + } + /** * Adds, to the existing information, field(s) that are read in * the source record(s) from the first input. @@ -253,7 +324,7 @@ public class DualInputSemanticProperties extends SemanticProperties { super.clearProperties(); init(); } - + @Override public boolean isEmpty() { return super.isEmpty() && @@ -263,7 +334,11 @@ public class DualInputSemanticProperties extends SemanticProperties { (readFields2 == null || readFields2.size() == 0); } - + @Override + public String toString() { + return "DISP(" + this.forwardedFields1 + "; " + this.forwardedFields2 + ")"; + } + private void init() { this.forwardedFields1 = new HashMap<Integer,FieldSet>(); this.forwardedFields2 = new HashMap<Integer,FieldSet>();
