[FLINK-1328] [api-breaking][java-api][scala-api][optimizer] Reworked semantic annotations for functions. - Renamed constantField annotations to forwardedFields annotation - Forwarded fields can be defined for (nested) tuples, Pojos, case classes - Added semantic function information to example programs
This closes #311 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de8e066c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de8e066c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de8e066c Branch: refs/heads/master Commit: de8e066ccbd0a31e5746bc0bee524a48bba3a552 Parents: 78f41e9 Author: Fabian Hueske <[email protected]> Authored: Wed Dec 17 18:59:14 2014 +0100 Committer: Fabian Hueske <[email protected]> Committed: Wed Jan 28 01:39:01 2015 +0100 ---------------------------------------------------------------------- .../spargel/java/VertexCentricIteration.java | 2 +- .../spargel/java/SpargelTranslationTest.java | 8 +- .../aggregation/ComparableAggregator.java | 5 +- .../api/function/aggregation/SumAggregator.java | 4 +- .../dag/AbstractPartialSolutionNode.java | 3 +- .../flink/compiler/dag/BinaryUnionNode.java | 37 +- .../flink/compiler/dag/BulkIterationNode.java | 3 +- .../apache/flink/compiler/dag/DataSinkNode.java | 3 +- .../flink/compiler/dag/DataSourceNode.java | 3 +- .../apache/flink/compiler/dag/FilterNode.java | 6 +- .../flink/compiler/dag/OptimizerNode.java | 28 - .../flink/compiler/dag/PartitionNode.java | 7 +- .../flink/compiler/dag/UnaryOperatorNode.java | 4 +- .../compiler/dag/WorksetIterationNode.java | 3 +- .../dataproperties/GlobalProperties.java | 156 +- .../dataproperties/InterestingProperties.java | 8 +- .../dataproperties/LocalProperties.java | 139 +- .../RequestedGlobalProperties.java | 99 +- .../RequestedLocalProperties.java | 49 +- .../postpass/GenericFlatTypePostPass.java | 2 +- .../apache/flink/compiler/DOPChangeTest.java | 7 +- .../flink/compiler/IterationsCompilerTest.java | 6 +- .../compiler/SemanticPropOptimizerTest.java | 941 --------- .../SemanticPropertiesAPIToPlanTest.java | 182 ++ .../flink/compiler/SortPartialReuseTest.java | 8 +- .../CoGroupCustomPartitioningTest.java | 4 +- .../JoinCustomPartitioningTest.java | 4 +- .../GlobalPropertiesFilteringTest.java | 418 +++- .../GlobalPropertiesMatchingTest.java | 3 +- .../GlobalPropertiesPushdownTest.java | 29 +- .../LocalPropertiesFilteringTest.java | 376 ++++ .../dataproperties/MockDistribution.java | 49 + .../dataproperties/MockPartitioner.java | 5 +- .../RequestedGlobalPropertiesFilteringTest.java | 433 +++++ .../RequestedLocalPropertiesFilteringTest.java | 248 +++ .../WorksetIterationsJavaApiCompilerTest.java | 6 +- .../api/common/operators/DualInputOperator.java | 2 +- .../operators/DualInputSemanticProperties.java | 329 +--- .../flink/api/common/operators/Ordering.java | 2 +- .../common/operators/SemanticProperties.java | 103 +- .../common/operators/SingleInputOperator.java | 2 +- .../SingleInputSemanticProperties.java | 239 +-- .../operators/base/PartitionOperatorBase.java | 2 +- .../api/common/typeutils/CompositeType.java | 49 +- .../DualInputSemanticPropertiesTest.java | 255 +++ .../SingleInputSemanticPropertiesTest.java | 183 ++ .../flink/examples/java/clustering/KMeans.java | 9 +- .../java/graph/ConnectedComponents.java | 16 +- .../examples/java/graph/EnumTrianglesBasic.java | 3 + .../examples/java/graph/EnumTrianglesOpt.java | 4 + .../examples/java/graph/PageRankBasic.java | 6 +- .../java/graph/TransitiveClosureNaive.java | 6 +- .../examples/java/ml/LinearRegression.java | 3 + .../java/relational/WebLogAnalysis.java | 2 + .../examples/scala/clustering/KMeans.scala | 8 +- .../scala/graph/ConnectedComponents.scala | 6 +- .../examples/scala/graph/DeltaPageRank.scala | 3 +- .../scala/graph/EnumTrianglesBasic.scala | 3 + .../examples/scala/graph/EnumTrianglesOpt.scala | 5 +- .../examples/scala/graph/PageRankBasic.scala | 4 +- .../scala/graph/TransitiveClosureNaive.scala | 6 +- .../scala/relational/WebLogAnalysis.scala | 4 +- .../api/java/functions/FunctionAnnotation.java | 586 +++--- .../api/java/functions/SemanticPropUtil.java | 697 ++++--- .../flink/api/java/operators/CrossOperator.java | 13 +- .../api/java/operators/DistinctOperator.java | 3 +- .../flink/api/java/operators/JoinOperator.java | 39 +- .../apache/flink/api/java/operators/Keys.java | 8 +- .../api/java/operators/ProjectOperator.java | 3 +- .../java/operators/SingleInputUdfOperator.java | 98 +- .../api/java/operators/TwoInputUdfOperator.java | 188 +- .../translation/KeyExtractingMapper.java | 3 +- .../translation/KeyRemovingMapper.java | 3 +- .../translation/PlanFilterOperator.java | 3 +- .../record/functions/FunctionAnnotation.java | 233 +-- .../flink/api/java/typeutils/PojoTypeInfo.java | 171 +- .../api/java/typeutils/TupleTypeInfoBase.java | 169 +- .../java/functions/SemanticPropUtilTest.java | 1813 ++++++++++++------ .../SemanticPropertiesProjectionTest.java | 383 ++-- .../SemanticPropertiesTranslationTest.java | 798 +++++--- .../flink/api/java/operators/KeysTest.java | 21 +- .../record/CoGroupWrappingFunctionTest.java | 19 +- .../java/record/ReduceWrappingFunctionTest.java | 19 +- .../type/extractor/PojoTypeExtractionTest.java | 18 +- .../java/type/extractor/TypeExtractorTest.java | 14 +- .../api/java/typeutils/CompositeTypeTest.java | 178 ++ .../typeutils/runtime/PojoSerializerTest.java | 2 +- .../org/apache/flink/api/scala/DataSet.scala | 18 +- .../api/scala/typeutils/CaseClassTypeInfo.scala | 165 +- .../CoGroupConnectedComponentsSecondITCase.java | 8 +- .../SemanticPropertiesTranslationTest.scala | 152 +- .../scala/types/TypeInformationGenTest.scala | 139 ++ 92 files changed, 6689 insertions(+), 3817 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java index a3a19d3..4f84467 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java @@ -350,7 +350,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver } // let the operator know that we preserve the key field - updates.withConstantSetFirst("0").withConstantSetSecond("0"); + updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0"); return iteration.closeWith(updates, updates); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java index 96692ef..b31618c 100644 --- a/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java +++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java @@ -100,8 +100,8 @@ public class SpargelTranslationTest { // validate that the semantic properties are set as they should TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset(); - assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField1(0).contains(0)); - assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField2(0).contains(0)); + assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0)); + assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0)); TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1(); @@ -179,8 +179,8 @@ public class SpargelTranslationTest { // validate that the semantic properties are set as they should TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset(); - assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField1(0).contains(0)); - assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField2(0).contains(0)); + assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0)); + assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0)); TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1(); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java index 7ea7ba1..226c45a 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.function.aggregation; import java.lang.reflect.Array; import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; @@ -189,9 +188,7 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T> { @SuppressWarnings("unchecked") CompositeType<T> cType = (CompositeType<T>) typeInfo; - List<FlatFieldDescriptor> fieldDescriptors = new ArrayList<FlatFieldDescriptor>(); - cType.getKey(field, 0, fieldDescriptors); - + List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field); int logicalKeyPosition = fieldDescriptors.get(0).getPosition(); if (cType instanceof PojoTypeInfo) { http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java index 384b4f6..142028b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.function.aggregation; import java.lang.reflect.Array; import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.functions.ReduceFunction; @@ -138,8 +137,7 @@ public abstract class SumAggregator { @SuppressWarnings("unchecked") CompositeType<T> cType = (CompositeType<T>) type; - List<FlatFieldDescriptor> fieldDescriptors = new ArrayList<FlatFieldDescriptor>(); - cType.getKey(field, 0, fieldDescriptors); + List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field); int logicalKeyPosition = fieldDescriptors.get(0).getPosition(); Class<?> keyClass = fieldDescriptors.get(0).getType().getTypeClass(); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java index c54076d..d996fe9 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.costs.CostEstimator; import org.apache.flink.compiler.plan.PlanNode; @@ -88,7 +89,7 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode { @Override public SemanticProperties getSemanticProperties() { - return null; + return new EmptySemanticProperties(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java index 5d805a9..bbe9563 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java @@ -24,9 +24,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.Union; +import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.costs.CostEstimator; @@ -255,9 +255,7 @@ public class BinaryUnionNode extends TwoInputNode { @Override public SemanticProperties getSemanticProperties() { - DualInputSemanticProperties sprops = new DualInputSemanticProperties(); - sprops.setAllFieldsConstant(true); - return sprops; + return new UnionSemanticProperties(); } @Override @@ -270,4 +268,35 @@ public class BinaryUnionNode extends TwoInputNode { this.estimatedOutputSize = in1.estimatedOutputSize > 0 && in2.estimatedOutputSize > 0 ? in1.estimatedOutputSize + in2.estimatedOutputSize : -1; } + + public static class UnionSemanticProperties implements SemanticProperties { + + @Override + public FieldSet getForwardingTargetFields(int input, int sourceField) { + if (input != 0 && input != 1) { + throw new IndexOutOfBoundsException("Invalid input index for binary union node."); + } + + return new FieldSet(sourceField); + } + + @Override + public int getForwardingSourceField(int input, int targetField) { + if (input != 0 && input != 1) { + throw new IndexOutOfBoundsException(); + } + + return targetField; + } + + @Override + public FieldSet getReadFields(int input) { + if (input != 0 && input != 1) { + throw new IndexOutOfBoundsException(); + } + + return FieldSet.EMPTY_SET; + } + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java index 18ac4c2..43b5799 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; import org.apache.flink.api.common.operators.base.BulkIterationBase; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.compiler.CompilerException; @@ -186,7 +187,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode @Override public SemanticProperties getSemanticProperties() { - return null; + return new EmptySemanticProperties(); } protected void readStubAnnotations() {} http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java index 2d4fb30..aa80451 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.GenericDataSinkBase; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.costs.CostEstimator; @@ -234,7 +235,7 @@ public class DataSinkNode extends OptimizerNode { @Override public SemanticProperties getSemanticProperties() { - return null; + return new EmptySemanticProperties(); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java index 752a763..10c77ca 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.PactCompiler; import org.apache.flink.compiler.costs.CostEstimator; @@ -195,7 +196,7 @@ public class DataSourceNode extends OptimizerNode { @Override public SemanticProperties getSemanticProperties() { - return null; + return new EmptySemanticProperties(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java index 140734c..33b2049 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java @@ -52,11 +52,7 @@ public class FilterNode extends SingleInputNode { @Override public SemanticProperties getSemanticProperties() { - - SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); - sprops.setAllFieldsConstant(true); - - return sprops; + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java index 75becf8..b717560 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java @@ -666,35 +666,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat // ------------------------------------------------------------------------ // Access of stub annotations // ------------------------------------------------------------------------ - - /** - * Returns the key columns for the specific input, if all keys are preserved - * by this node. Null, otherwise. - */ - protected int[] getConstantKeySet(int input) { - Operator<?> contract = getPactContract(); - if (contract instanceof AbstractUdfOperator<?, ?>) { - AbstractUdfOperator<?, ?> abstractPact = (AbstractUdfOperator<?, ?>) contract; - int[] keyColumns = abstractPact.getKeyColumns(input); - if (keyColumns != null) { - if (keyColumns.length == 0) { - return null; - } - for (int keyColumn : keyColumns) { - FieldSet fs = getSemanticProperties() == null ? null : getSemanticProperties().getForwardFields(input, keyColumn); - if (fs == null) { - return null; - } else if (!fs.contains(keyColumn)) { - return null; - } - } - return keyColumns; - } - } - return null; - } - /** * An optional method where nodes can describe which fields will be unique in their output. */ http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java index 53b5dd9..75961bf 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.List; import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.base.PartitionOperatorBase; import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod; import org.apache.flink.api.common.operators.util.FieldSet; @@ -74,9 +76,8 @@ public class PartitionNode extends SingleInputNode { } @Override - public boolean isFieldConstant(int input, int fieldNumber) { - // Partition does not change any data - return true; + public SemanticProperties getSemanticProperties() { + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java index aaf0a10..90ba480 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java @@ -59,9 +59,7 @@ public class UnaryOperatorNode extends SingleInputNode { @Override public SemanticProperties getSemanticProperties() { - SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); - sprops.setAllFieldsConstant(true); - return sprops; + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java index 95fc066..0557633 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; import org.apache.flink.api.common.operators.base.DeltaIterationBase; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.typeinfo.NothingTypeInfo; @@ -222,7 +223,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode @Override public SemanticProperties getSemanticProperties() { - return null; + return new EmptySemanticProperties(); } protected void readStubAnnotations() {} http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java index 4fe632a..fb1f1a2 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java @@ -19,7 +19,6 @@ package org.apache.flink.compiler.dataproperties; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; import org.apache.flink.api.common.functions.Partitioner; @@ -32,6 +31,8 @@ import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.plan.Channel; import org.apache.flink.compiler.util.Utils; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class represents global properties of the data at a certain point in the plan. @@ -41,6 +42,8 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType; * or an FieldSet with the hash partitioning columns. */ public class GlobalProperties implements Cloneable { + + public static final Logger LOG = LoggerFactory.getLogger(GlobalProperties.class); private PartitioningProperty partitioning; // the type partitioning @@ -213,18 +216,6 @@ public class GlobalProperties implements Cloneable { } } - public Ordering getOrdering() { - return this.ordering; - } - - public void setOrdering(Ordering ordering) { - this.ordering = ordering; - } - - public void setPartitioningFields(FieldList partitioningFields) { - this.partitioningFields = partitioningFields; - } - public boolean isFullyReplicated() { return this.partitioning == PartitioningProperty.FULL_REPLICATION; } @@ -246,83 +237,114 @@ public class GlobalProperties implements Cloneable { } /** - * Filters these GlobalProperties by the fields that are constant or forwarded to another output field. + * Filters these GlobalProperties by the fields that are forwarded to the output + * as described by the SemanticProperties. * - * @param props The node representing the contract. + * @param props The semantic properties holding information about forwarded fields. * @param input The index of the input. * @return The filtered GlobalProperties */ public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { - // check if partitioning survives - FieldList forwardFields = null; - GlobalProperties returnProps = this; if (props == null) { - return new GlobalProperties(); + throw new NullPointerException("SemanticProperties may not be null."); } - if (this.ordering != null) { - Ordering no = new Ordering(); - for (int index : this.ordering.getInvolvedIndexes()) { - forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList(); - if (forwardFields == null) { - returnProps = new GlobalProperties(); - no = null; - break; - } else { - returnProps = returnProps == this ? this.clone() : returnProps; - for (int i = 0; i < forwardFields.size(); i++) { - no.appendOrdering(forwardFields.get(i), this.ordering.getType(index), this.ordering.getOrder(index)); + GlobalProperties gp = new GlobalProperties(); + + // filter partitioning + switch(this.partitioning) { + case FULL_REPLICATION: + return gp; + case RANGE_PARTITIONED: + // check if ordering is preserved + Ordering newOrdering = new Ordering(); + for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) { + int sourceField = this.ordering.getInvolvedIndexes().get(i); + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + + if (targetField == null || targetField.size() == 0) { + // partitioning is destroyed + newOrdering = null; + break; + } else { + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), this.ordering.getOrder(i)); } } - returnProps.setOrdering(no); - } - } - if (this.partitioningFields != null) { - returnProps = returnProps == this ? this.clone() : returnProps; - returnProps.setPartitioningFields(new FieldList()); + if(newOrdering != null) { + gp.partitioning = PartitioningProperty.RANGE_PARTITIONED; + gp.ordering = newOrdering; + gp.partitioningFields = newOrdering.getInvolvedIndexes(); + } + break; + case HASH_PARTITIONED: + case ANY_PARTITIONING: + case CUSTOM_PARTITIONING: + FieldList newPartitioningFields = new FieldList(); + for (int sourceField : this.partitioningFields) { + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); - for (int index : this.partitioningFields) { - forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList(); - if (forwardFields == null) { - returnProps = new GlobalProperties(); - break; - } else { - returnProps.setPartitioningFields(returnProps.getPartitioningFields().addFields(forwardFields)); + if (targetField == null || targetField.size() == 0) { + newPartitioningFields = null; + break; + } else { + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newPartitioningFields = newPartitioningFields.addField(targetField.toArray()[0]); + } } - } + if(newPartitioningFields != null) { + gp.partitioning = this.partitioning; + gp.partitioningFields = newPartitioningFields; + gp.customPartitioner = this.customPartitioner; + } + break; + case FORCED_REBALANCED: + case RANDOM: + gp.partitioning = this.partitioning; + break; + default: + throw new RuntimeException("Unknown partitioning type."); } + + // filter unique field combinations if (this.uniqueFieldCombinations != null) { - HashSet<FieldSet> newSet = new HashSet<FieldSet>(); - newSet.addAll(this.uniqueFieldCombinations); - for (Iterator<FieldSet> combos = this.uniqueFieldCombinations.iterator(); combos.hasNext(); ){ - FieldSet current = combos.next(); - FieldSet nfs = new FieldSet(); - for (Integer field : current) { - if (props.getForwardFields(input, field) == null) { - newSet.remove(current); - nfs = null; + Set<FieldSet> newUniqueFieldCombinations = new HashSet<FieldSet>(); + for (FieldSet fieldCombo : this.uniqueFieldCombinations) { + FieldSet newFieldCombo = new FieldSet(); + for (Integer sourceField : fieldCombo) { + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + + if (targetField == null || targetField.size() == 0) { + newFieldCombo = null; break; } else { - nfs = nfs.addFields(props.getForwardFields(input, field)); + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newFieldCombo = newFieldCombo.addField(targetField.toArray()[0]); } } - if (nfs != null) { - newSet.remove(current); - newSet.add(nfs); + if (newFieldCombo != null) { + newUniqueFieldCombinations.add(newFieldCombo); } } - - GlobalProperties gp = returnProps.clone(); - gp.uniqueFieldCombinations = newSet.isEmpty() ? null : newSet; - return gp; - } - - if (this.partitioning == PartitioningProperty.FULL_REPLICATION) { - return new GlobalProperties(); + if(!newUniqueFieldCombinations.isEmpty()) { + gp.uniqueFieldCombinations = newUniqueFieldCombinations; + } } - return returnProps; + return gp; } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java index 0f60576..85736c6 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java @@ -94,11 +94,11 @@ public class InterestingProperties implements Cloneable public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) { InterestingProperties iProps = new InterestingProperties(); - SemanticProperties props = null; - if (node instanceof SingleInputNode) { - props = node.getSemanticProperties(); - } else if (node instanceof TwoInputNode) { + SemanticProperties props; + if (node instanceof SingleInputNode || node instanceof TwoInputNode) { props = node.getSemanticProperties(); + } else { + props = new SemanticProperties.EmptySemanticProperties(); } for (RequestedGlobalProperties rgp : this.globalProps) { http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java index b06774c..0555599 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java @@ -20,20 +20,23 @@ package org.apache.flink.compiler.dataproperties; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class represents local properties of the data. A local property is a property that exists * within the data of a single partition. */ public class LocalProperties implements Cloneable { - + + public static final Logger LOG = LoggerFactory.getLogger(GlobalProperties.class); + public static final LocalProperties EMPTY = new LocalProperties(); // -------------------------------------------------------------------------------------------- @@ -126,101 +129,107 @@ public class LocalProperties implements Cloneable { // -------------------------------------------------------------------------------------------- /** - * Filters these properties by what can be preserved through a user function's constant fields set. - * - * @param props The optimizer node that potentially modifies the properties. - * @param input The input of the node which is relevant. + * Filters these LocalProperties by the fields that are forwarded to the output + * as described by the SemanticProperties. * + * @param props The semantic properties holding information about forwarded fields. + * @param input The index of the input. * @return The filtered LocalProperties */ public LocalProperties filterBySemanticProperties(SemanticProperties props, int input) { - // check, whether the local order is preserved - Ordering no = this.ordering; - FieldList ngf = this.groupedFields; - Set<FieldSet> nuf = this.uniqueFields; - FieldList forwardList = null; if (props == null) { - return new LocalProperties(); + throw new NullPointerException("SemanticProperties may not be null."); } + LocalProperties returnProps = new LocalProperties(); + + // check if sorting is preserved if (this.ordering != null) { - no = new Ordering(); - final FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); - for (int i = 0; i < involvedIndexes.size(); i++) { - forwardList = props.getForwardFields(input, involvedIndexes.get(i)) == null ? null : props.getForwardFields(input, involvedIndexes.get(i)).toFieldList(); + Ordering newOrdering = new Ordering(); - if (forwardList == null) { - no = null; - ngf = null; - /*if (i == 0) { - no = null; - ngf = null; + for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) { + int sourceField = this.ordering.getInvolvedIndexes().get(i); + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + if (targetField == null || targetField.size() == 0) { + if (i == 0) { + // order fully destroyed + newOrdering = null; + break; } else { - no = this.ordering.createNewOrderingUpToIndex(i); - ngf = no.getInvolvedIndexes(); - }*/ - break; + // order partially preserved + break; + } } else { - no.appendOrdering(forwardList.get(0), this.ordering.getType(i), this.ordering.getOrder(i)); - ngf = no.getInvolvedIndexes(); + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), this.ordering.getOrder(i)); } } + + returnProps.ordering = newOrdering; + if (newOrdering != null) { + returnProps.groupedFields = newOrdering.getInvolvedIndexes(); + } else { + returnProps.groupedFields = null; + } } + // check if grouping is preserved else if (this.groupedFields != null) { - // check, whether the local key grouping is preserved - for (Integer index : this.groupedFields) { - forwardList = props.getForwardFields(input, index) == null ? null : props.getForwardFields(input, index).toFieldList(); - if (forwardList == null) { - ngf = null; + FieldList newGroupedFields = new FieldList(); + + for (Integer sourceField : this.groupedFields) { + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + if (targetField == null || targetField.size() == 0) { + newGroupedFields = null; break; - } else if (!forwardList.contains(index)) { - FieldList grouped = new FieldList(); - for (Integer value : ngf.toFieldList()) { - if (value.intValue() == index) { - grouped = grouped.addFields(forwardList); - } else { - grouped = grouped.addField(value); - } + } else { + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); } - ngf = grouped; + newGroupedFields = newGroupedFields.addField(targetField.toArray()[0]); } } + returnProps.groupedFields = newGroupedFields; } if (this.uniqueFields != null) { - HashSet<FieldSet> newSet = new HashSet<FieldSet>(); - newSet.addAll(this.uniqueFields); - for (Iterator<FieldSet> combos = this.uniqueFields.iterator(); combos.hasNext(); ){ - FieldSet current = combos.next(); - FieldSet nfs = new FieldSet(); - for (Integer field : current) { - if (props.getForwardFields(input, field) == null) { - newSet.remove(current); - nfs = null; + Set<FieldSet> newUniqueFields = new HashSet<FieldSet>(); + for (FieldSet fields : this.uniqueFields) { + FieldSet newFields = new FieldSet(); + for (Integer sourceField : fields) { + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + + if (targetField == null || targetField.size() == 0) { + newFields = null; break; } else { - nfs = nfs.addFields(props.getForwardFields(input, field)); + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newFields = newFields.addField(targetField.toArray()[0]); } } - if (nfs != null) { - newSet.remove(current); - newSet.add(nfs); + if (newFields != null) { + newUniqueFields.add(newFields); } } - nuf = newSet.isEmpty() ? null : newSet; + if (!newUniqueFields.isEmpty()) { + returnProps.uniqueFields = newUniqueFields; + } else { + returnProps.uniqueFields = null; + } } - if (no == this.ordering && ngf == this.groupedFields && nuf == this.uniqueFields) { - return this; - } else { - LocalProperties lp = new LocalProperties(); - lp.ordering = no; - lp.groupedFields = ngf; - lp.uniqueFields = nuf; - return lp; - } + return returnProps; } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java index 1320f82..de56c37 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.SemanticProperties; -import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.plan.Channel; @@ -189,73 +188,65 @@ public final class RequestedGlobalProperties implements Cloneable { this.customPartitioner = null; } - public void setPartitioningFields(FieldSet partitioned) { - this.partitioningFields = partitioned; - } - - public void setPartitioningFields(FieldSet fields, PartitioningProperty partitioning) { - this.partitioningFields = fields; - this.partitioning = partitioning; - } - - public void setOrdering(Ordering newOrdering) { - this.ordering = newOrdering; - } - /** - * Filters these properties by what can be preserved by the given node when propagated down + * Filters these properties by what can be preserved by the given SemanticProperties when propagated down * to the given input. * - * @param props The node representing the contract. - * @param input The index of the input. + * @param props The SemanticProperties which define which fields are preserved. + * @param input The index of the operator's input. * @return The filtered RequestedGlobalProperties */ public RequestedGlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { - FieldList sourceList; - RequestedGlobalProperties returnProps = null; + RequestedGlobalProperties returnProps = new RequestedGlobalProperties(); + // no semantic properties available. All global properties are filtered. if (props == null) { - return null; + throw new NullPointerException("SemanticProperties may not be null."); } - // check if partitioning survives - if (this.ordering != null) { - Ordering no = new Ordering(); - returnProps = new RequestedGlobalProperties(); - returnProps.setPartitioningFields(new FieldSet(), this.partitioning); + RequestedGlobalProperties rgProp = new RequestedGlobalProperties(); - for (int index = 0; index < this.ordering.getInvolvedIndexes().size(); index++) { - int value = this.ordering.getInvolvedIndexes().get(index); - sourceList = props.getSourceField(input, value) == null ? null : props.getSourceField(input, value).toFieldList(); - if (sourceList != null) { - no.appendOrdering(sourceList.get(0), this.ordering.getType(index), this.ordering.getOrder(index)); - } else { - return null; + switch(this.partitioning) { + case FULL_REPLICATION: + case FORCED_REBALANCED: + case CUSTOM_PARTITIONING: + case RANDOM: + // make sure that certain properties are not pushed down + return null; + case HASH_PARTITIONED: + case ANY_PARTITIONING: + FieldSet newFields = new FieldSet(); + for (Integer targetField : this.partitioningFields) { + int sourceField = props.getForwardingSourceField(input, targetField); + if (sourceField >= 0) { + newFields = newFields.addField(sourceField); + } else { + // partial partitionings are not preserved to avoid skewed partitioning + return null; + } } - } - returnProps.setOrdering(no); - } else if (this.partitioningFields != null) { - returnProps = new RequestedGlobalProperties(); - returnProps.setPartitioningFields(new FieldSet(), this.partitioning); - for (Integer index : this.partitioningFields) { - sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); - if (sourceList != null) { - returnProps.setPartitioningFields(returnProps.getPartitionedFields().addFields(sourceList), this.partitioning); - } else { - return null; + rgProp.partitioning = this.partitioning; + rgProp.partitioningFields = newFields; + return rgProp; + case RANGE_PARTITIONED: + // range partitioning + Ordering newOrdering = new Ordering(); + for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) { + int value = this.ordering.getInvolvedIndexes().get(i); + int sourceField = props.getForwardingSourceField(input, value); + if (sourceField >= 0) { + newOrdering.appendOrdering(sourceField, this.ordering.getType(i), this.ordering.getOrder(i)); + } else { + return null; + } } - } - } - // make sure that certain properties are not pushed down - final PartitioningProperty partitioning = this.partitioning; - if (partitioning == PartitioningProperty.FULL_REPLICATION || - partitioning == PartitioningProperty.FORCED_REBALANCED || - partitioning == PartitioningProperty.CUSTOM_PARTITIONING) - { - return null; + rgProp.partitioning = this.partitioning; + rgProp.ordering = newOrdering; + rgProp.dataDistribution = this.dataDistribution; + return rgProp; + default: + throw new RuntimeException("Unknown partitioning type encountered."); } - - return returnProps; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java index a5bf118..1f69959 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java @@ -23,7 +23,6 @@ import java.util.Arrays; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.SemanticProperties; -import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.plan.Channel; import org.apache.flink.compiler.util.Utils; @@ -135,50 +134,50 @@ public class RequestedLocalProperties implements Cloneable { // -------------------------------------------------------------------------------------------- /** - * Filters these properties by what can be preserved through a user function's constant fields set. - * Since interesting properties are filtered top-down, anything that partially destroys the ordering - * makes the properties uninteresting. - * - * @param props The optimizer node that potentially modifies the properties. - * @param input The input of the node which is relevant. + * Filters these properties by what can be preserved by the given SemanticProperties when propagated down + * to the given input. * + * @param props The SemanticProperties which define which fields are preserved. + * @param input The index of the operator's input. * @return The filtered RequestedLocalProperties */ public RequestedLocalProperties filterBySemanticProperties(SemanticProperties props, int input) { - FieldList sourceList; - RequestedLocalProperties returnProps = this; + // no semantic properties, all local properties are filtered if (props == null) { - return null; + throw new NullPointerException("SemanticProperties may not be null."); } if (this.ordering != null) { - Ordering no = new Ordering(); - returnProps = this.clone(); - for (int index = 0; index < this.ordering.getInvolvedIndexes().size(); index++) { - int value = this.ordering.getInvolvedIndexes().get(index); - sourceList = props.getSourceField(input, value) == null ? null : props.getSourceField(input, value).toFieldList(); - if (sourceList != null) { - no.appendOrdering(sourceList.get(0), this.ordering.getType(index), this.ordering.getOrder(index)); + Ordering newOrdering = new Ordering(); + + for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) { + int targetField = this.ordering.getInvolvedIndexes().get(i); + int sourceField = props.getForwardingSourceField(input, targetField); + if (sourceField >= 0) { + newOrdering.appendOrdering(sourceField, this.ordering.getType(i), this.ordering.getOrder(i)); } else { return null; } } - returnProps.setOrdering(no); + return new RequestedLocalProperties(newOrdering); + } else if (this.groupedFields != null) { - returnProps = this.clone(); - returnProps.setGroupedFields(new FieldList()); + FieldSet newGrouping = new FieldSet(); + // check, whether the local key grouping is preserved - for (Integer index : this.groupedFields) { - sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); - if (sourceList != null) { - returnProps.setGroupedFields(returnProps.getGroupedFields().addFields(sourceList)); + for (Integer targetField : this.groupedFields) { + int sourceField = props.getForwardingSourceField(input, targetField); + if (sourceField >= 0) { + newGrouping = newGrouping.addField(sourceField); } else { return null; } } + return new RequestedLocalProperties(newGrouping); + } else { + return null; } - return returnProps; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java index ab083ef..5da215f 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java @@ -525,7 +525,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im Integer pos = entry.getKey(); SemanticProperties sprops = optNode.getSemanticProperties(); - if (sprops != null && sprops.getForwardFields(input, pos) != null && sprops.getForwardFields(input,pos).contains(pos)) { + if (sprops != null && sprops.getForwardingTargetFields(input, pos) != null && sprops.getForwardingTargetFields(input, pos).contains(pos)) { targetSchema.addType(pos, entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java index 93a1fc5..c3a4c3a 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java @@ -250,7 +250,7 @@ public class DOPChangeTest extends CompilerTestBase { // submit the plan to the compiler OptimizedPlan oPlan = compileNoStats(plan); - + // check the optimized Plan // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same @@ -258,10 +258,7 @@ public class DOPChangeTest extends CompilerTestBase { SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); - SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); - - Assert.assertEquals("The Reduce 2 Node has an invalid local strategy.", LocalStrategy.NONE, red2Node.getInput().getLocalStrategy()); - Assert.assertEquals("The Map 2 Node has an invalid local strategy.", LocalStrategy.SORT, map2Node.getInput().getLocalStrategy()); + Assert.assertEquals("The Reduce 2 Node has an invalid local strategy.", LocalStrategy.SORT, red2Node.getInput().getLocalStrategy()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java index 87af91b..9445198 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java @@ -34,7 +34,7 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichJoinFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.compiler.plan.BulkIterationPlanNode; @@ -376,14 +376,14 @@ public class IterationsCompilerTest extends CompilerTestBase { } } - @ConstantFields("0") + @ForwardedFields("0") public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> { @Override public void reduce(Iterable<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {} } - @ConstantFields("0") + @ForwardedFields("0") public static final class DuplicateValue extends RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> { @Override
