[FLINK-1328] [api-breaking][java-api][scala-api][optimizer] Reworked semantic 
annotations for functions.
- Renamed constantField annotations to forwardedFields annotation
- Forwarded fields can be defined for (nested) tuples, Pojos, case classes
- Added semantic function information to example programs

This closes #311


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de8e066c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de8e066c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de8e066c

Branch: refs/heads/master
Commit: de8e066ccbd0a31e5746bc0bee524a48bba3a552
Parents: 78f41e9
Author: Fabian Hueske <[email protected]>
Authored: Wed Dec 17 18:59:14 2014 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Wed Jan 28 01:39:01 2015 +0100

----------------------------------------------------------------------
 .../spargel/java/VertexCentricIteration.java    |    2 +-
 .../spargel/java/SpargelTranslationTest.java    |    8 +-
 .../aggregation/ComparableAggregator.java       |    5 +-
 .../api/function/aggregation/SumAggregator.java |    4 +-
 .../dag/AbstractPartialSolutionNode.java        |    3 +-
 .../flink/compiler/dag/BinaryUnionNode.java     |   37 +-
 .../flink/compiler/dag/BulkIterationNode.java   |    3 +-
 .../apache/flink/compiler/dag/DataSinkNode.java |    3 +-
 .../flink/compiler/dag/DataSourceNode.java      |    3 +-
 .../apache/flink/compiler/dag/FilterNode.java   |    6 +-
 .../flink/compiler/dag/OptimizerNode.java       |   28 -
 .../flink/compiler/dag/PartitionNode.java       |    7 +-
 .../flink/compiler/dag/UnaryOperatorNode.java   |    4 +-
 .../compiler/dag/WorksetIterationNode.java      |    3 +-
 .../dataproperties/GlobalProperties.java        |  156 +-
 .../dataproperties/InterestingProperties.java   |    8 +-
 .../dataproperties/LocalProperties.java         |  139 +-
 .../RequestedGlobalProperties.java              |   99 +-
 .../RequestedLocalProperties.java               |   49 +-
 .../postpass/GenericFlatTypePostPass.java       |    2 +-
 .../apache/flink/compiler/DOPChangeTest.java    |    7 +-
 .../flink/compiler/IterationsCompilerTest.java  |    6 +-
 .../compiler/SemanticPropOptimizerTest.java     |  941 ---------
 .../SemanticPropertiesAPIToPlanTest.java        |  182 ++
 .../flink/compiler/SortPartialReuseTest.java    |    8 +-
 .../CoGroupCustomPartitioningTest.java          |    4 +-
 .../JoinCustomPartitioningTest.java             |    4 +-
 .../GlobalPropertiesFilteringTest.java          |  418 +++-
 .../GlobalPropertiesMatchingTest.java           |    3 +-
 .../GlobalPropertiesPushdownTest.java           |   29 +-
 .../LocalPropertiesFilteringTest.java           |  376 ++++
 .../dataproperties/MockDistribution.java        |   49 +
 .../dataproperties/MockPartitioner.java         |    5 +-
 .../RequestedGlobalPropertiesFilteringTest.java |  433 +++++
 .../RequestedLocalPropertiesFilteringTest.java  |  248 +++
 .../WorksetIterationsJavaApiCompilerTest.java   |    6 +-
 .../api/common/operators/DualInputOperator.java |    2 +-
 .../operators/DualInputSemanticProperties.java  |  329 +---
 .../flink/api/common/operators/Ordering.java    |    2 +-
 .../common/operators/SemanticProperties.java    |  103 +-
 .../common/operators/SingleInputOperator.java   |    2 +-
 .../SingleInputSemanticProperties.java          |  239 +--
 .../operators/base/PartitionOperatorBase.java   |    2 +-
 .../api/common/typeutils/CompositeType.java     |   49 +-
 .../DualInputSemanticPropertiesTest.java        |  255 +++
 .../SingleInputSemanticPropertiesTest.java      |  183 ++
 .../flink/examples/java/clustering/KMeans.java  |    9 +-
 .../java/graph/ConnectedComponents.java         |   16 +-
 .../examples/java/graph/EnumTrianglesBasic.java |    3 +
 .../examples/java/graph/EnumTrianglesOpt.java   |    4 +
 .../examples/java/graph/PageRankBasic.java      |    6 +-
 .../java/graph/TransitiveClosureNaive.java      |    6 +-
 .../examples/java/ml/LinearRegression.java      |    3 +
 .../java/relational/WebLogAnalysis.java         |    2 +
 .../examples/scala/clustering/KMeans.scala      |    8 +-
 .../scala/graph/ConnectedComponents.scala       |    6 +-
 .../examples/scala/graph/DeltaPageRank.scala    |    3 +-
 .../scala/graph/EnumTrianglesBasic.scala        |    3 +
 .../examples/scala/graph/EnumTrianglesOpt.scala |    5 +-
 .../examples/scala/graph/PageRankBasic.scala    |    4 +-
 .../scala/graph/TransitiveClosureNaive.scala    |    6 +-
 .../scala/relational/WebLogAnalysis.scala       |    4 +-
 .../api/java/functions/FunctionAnnotation.java  |  586 +++---
 .../api/java/functions/SemanticPropUtil.java    |  697 ++++---
 .../flink/api/java/operators/CrossOperator.java |   13 +-
 .../api/java/operators/DistinctOperator.java    |    3 +-
 .../flink/api/java/operators/JoinOperator.java  |   39 +-
 .../apache/flink/api/java/operators/Keys.java   |    8 +-
 .../api/java/operators/ProjectOperator.java     |    3 +-
 .../java/operators/SingleInputUdfOperator.java  |   98 +-
 .../api/java/operators/TwoInputUdfOperator.java |  188 +-
 .../translation/KeyExtractingMapper.java        |    3 +-
 .../translation/KeyRemovingMapper.java          |    3 +-
 .../translation/PlanFilterOperator.java         |    3 +-
 .../record/functions/FunctionAnnotation.java    |  233 +--
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  171 +-
 .../api/java/typeutils/TupleTypeInfoBase.java   |  169 +-
 .../java/functions/SemanticPropUtilTest.java    | 1813 ++++++++++++------
 .../SemanticPropertiesProjectionTest.java       |  383 ++--
 .../SemanticPropertiesTranslationTest.java      |  798 +++++---
 .../flink/api/java/operators/KeysTest.java      |   21 +-
 .../record/CoGroupWrappingFunctionTest.java     |   19 +-
 .../java/record/ReduceWrappingFunctionTest.java |   19 +-
 .../type/extractor/PojoTypeExtractionTest.java  |   18 +-
 .../java/type/extractor/TypeExtractorTest.java  |   14 +-
 .../api/java/typeutils/CompositeTypeTest.java   |  178 ++
 .../typeutils/runtime/PojoSerializerTest.java   |    2 +-
 .../org/apache/flink/api/scala/DataSet.scala    |   18 +-
 .../api/scala/typeutils/CaseClassTypeInfo.scala |  165 +-
 .../CoGroupConnectedComponentsSecondITCase.java |    8 +-
 .../SemanticPropertiesTranslationTest.scala     |  152 +-
 .../scala/types/TypeInformationGenTest.scala    |  139 ++
 92 files changed, 6689 insertions(+), 3817 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index a3a19d3..4f84467 100644
--- 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -350,7 +350,7 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
                }
 
                // let the operator know that we preserve the key field
-               updates.withConstantSetFirst("0").withConstantSetSecond("0");
+               
updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
                
                return iteration.closeWith(updates, updates);
                

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
 
b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
index 96692ef..b31618c 100644
--- 
a/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
+++ 
b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
@@ -100,8 +100,8 @@ public class SpargelTranslationTest {
                        
                        // validate that the semantic properties are set as 
they should
                        TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = 
(TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-                       
assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField1(0).contains(0));
-                       
assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField2(0).contains(0));
+                       
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 
0).contains(0));
+                       
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 
0).contains(0));
                        
                        TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = 
(TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
                        
@@ -179,8 +179,8 @@ public class SpargelTranslationTest {
                        
                        // validate that the semantic properties are set as 
they should
                        TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = 
(TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-                       
assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField1(0).contains(0));
-                       
assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField2(0).contains(0));
+                       
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 
0).contains(0));
+                       
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 
0).contains(0));
                        
                        TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = 
(TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
index 7ea7ba1..226c45a 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.function.aggregation;
 
 import java.lang.reflect.Array;
 import java.lang.reflect.Field;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -189,9 +188,7 @@ public abstract class ComparableAggregator<T> extends 
AggregationFunction<T> {
                        @SuppressWarnings("unchecked")
                        CompositeType<T> cType = (CompositeType<T>) typeInfo;
 
-                       List<FlatFieldDescriptor> fieldDescriptors = new 
ArrayList<FlatFieldDescriptor>();
-                       cType.getKey(field, 0, fieldDescriptors);
-
+                       List<FlatFieldDescriptor> fieldDescriptors = 
cType.getFlatFields(field);
                        int logicalKeyPosition = 
fieldDescriptors.get(0).getPosition();
 
                        if (cType instanceof PojoTypeInfo) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
index 384b4f6..142028b 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.function.aggregation;
 
 import java.lang.reflect.Array;
 import java.lang.reflect.Field;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -138,8 +137,7 @@ public abstract class SumAggregator {
                        @SuppressWarnings("unchecked")
                        CompositeType<T> cType = (CompositeType<T>) type;
 
-                       List<FlatFieldDescriptor> fieldDescriptors = new 
ArrayList<FlatFieldDescriptor>();
-                       cType.getKey(field, 0, fieldDescriptors);
+                       List<FlatFieldDescriptor> fieldDescriptors = 
cType.getFlatFields(field);
 
                        int logicalKeyPosition = 
fieldDescriptors.get(0).getPosition();
                        Class<?> keyClass = 
fieldDescriptors.get(0).getType().getTypeClass();

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
index c54076d..d996fe9 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.costs.CostEstimator;
 import org.apache.flink.compiler.plan.PlanNode;
@@ -88,7 +89,7 @@ public abstract class AbstractPartialSolutionNode extends 
OptimizerNode {
 
        @Override
        public SemanticProperties getSemanticProperties() {
-               return null;
+               return new EmptySemanticProperties();
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
index 5d805a9..bbe9563 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
@@ -24,9 +24,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.costs.CostEstimator;
@@ -255,9 +255,7 @@ public class BinaryUnionNode extends TwoInputNode {
 
        @Override
        public SemanticProperties getSemanticProperties() {
-               DualInputSemanticProperties sprops = new 
DualInputSemanticProperties();
-               sprops.setAllFieldsConstant(true);
-               return sprops;
+               return new UnionSemanticProperties();
        }
        
        @Override
@@ -270,4 +268,35 @@ public class BinaryUnionNode extends TwoInputNode {
                this.estimatedOutputSize = in1.estimatedOutputSize > 0 && 
in2.estimatedOutputSize > 0 ?
                        in1.estimatedOutputSize + in2.estimatedOutputSize : -1;
        }
+
+       public static class UnionSemanticProperties implements 
SemanticProperties {
+
+               @Override
+               public FieldSet getForwardingTargetFields(int input, int 
sourceField) {
+                       if (input != 0 && input != 1) {
+                               throw new IndexOutOfBoundsException("Invalid 
input index for binary union node.");
+                       }
+
+                       return new FieldSet(sourceField);
+               }
+
+               @Override
+               public int getForwardingSourceField(int input, int targetField) 
{
+                       if (input != 0 && input != 1) {
+                               throw new IndexOutOfBoundsException();
+                       }
+
+                       return targetField;
+               }
+
+               @Override
+               public FieldSet getReadFields(int input) {
+                       if (input != 0 && input != 1) {
+                               throw new IndexOutOfBoundsException();
+                       }
+
+                       return FieldSet.EMPTY_SET;
+               }
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
index 18ac4c2..43b5799 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.flink.api.common.operators.SemanticProperties;
+import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.compiler.CompilerException;
@@ -186,7 +187,7 @@ public class BulkIterationNode extends SingleInputNode 
implements IterationNode
 
        @Override
        public SemanticProperties getSemanticProperties() {
-               return null;
+               return new EmptySemanticProperties();
        }
        
        protected void readStubAnnotations() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
index 2d4fb30..aa80451 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.costs.CostEstimator;
@@ -234,7 +235,7 @@ public class DataSinkNode extends OptimizerNode {
 
        @Override
        public SemanticProperties getSemanticProperties() {
-               return null;
+               return new EmptySemanticProperties();
        }
                
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
index 752a763..10c77ca 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.costs.CostEstimator;
@@ -195,7 +196,7 @@ public class DataSourceNode extends OptimizerNode {
 
        @Override
        public SemanticProperties getSemanticProperties() {
-               return null;
+               return new EmptySemanticProperties();
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
index 140734c..33b2049 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
@@ -52,11 +52,7 @@ public class FilterNode extends SingleInputNode {
 
        @Override
        public SemanticProperties getSemanticProperties() {
-
-               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
-               sprops.setAllFieldsConstant(true);
-
-               return sprops;
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
index 75becf8..b717560 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
@@ -666,35 +666,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        // 
------------------------------------------------------------------------
        // Access of stub annotations
        // 
------------------------------------------------------------------------
-       
-       /**
-        * Returns the key columns for the specific input, if all keys are 
preserved
-        * by this node. Null, otherwise.
-        */
-       protected int[] getConstantKeySet(int input) {
-               Operator<?> contract = getPactContract();
-               if (contract instanceof AbstractUdfOperator<?, ?>) {
-                       AbstractUdfOperator<?, ?> abstractPact = 
(AbstractUdfOperator<?, ?>) contract;
-                       int[] keyColumns = abstractPact.getKeyColumns(input);
-                       if (keyColumns != null) {
-                               if (keyColumns.length == 0) {
-                                       return null;
-                               }
-                               for (int keyColumn : keyColumns) {
-                                       FieldSet fs = getSemanticProperties() 
== null ? null : getSemanticProperties().getForwardFields(input, keyColumn);
 
-                                       if (fs == null) {
-                                               return null;
-                                       } else if (!fs.contains(keyColumn)) {
-                                               return null;
-                                       }
-                               }
-                               return keyColumns;
-                       }
-               }
-               return null;
-       }
-       
        /**
         * An optional method where nodes can describe which fields will be 
unique in their output.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
index 53b5dd9..75961bf 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
@@ -23,6 +23,8 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.common.operators.util.FieldSet;
@@ -74,9 +76,8 @@ public class PartitionNode extends SingleInputNode {
        }
        
        @Override
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               // Partition does not change any data
-               return true;
+       public SemanticProperties getSemanticProperties() {
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
index aaf0a10..90ba480 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
@@ -59,9 +59,7 @@ public class UnaryOperatorNode extends SingleInputNode {
 
        @Override
        public SemanticProperties getSemanticProperties() {
-               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
-               sprops.setAllFieldsConstant(true);
-               return sprops;
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
index 95fc066..0557633 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.flink.api.common.operators.SemanticProperties;
+import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
@@ -222,7 +223,7 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
 
        @Override
        public SemanticProperties getSemanticProperties() {
-               return null;
+               return new EmptySemanticProperties();
        }
 
        protected void readStubAnnotations() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
index 4fe632a..fb1f1a2 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
@@ -19,7 +19,6 @@
 package org.apache.flink.compiler.dataproperties;
 
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.flink.api.common.functions.Partitioner;
@@ -32,6 +31,8 @@ import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.util.Utils;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class represents global properties of the data at a certain point in 
the plan.
@@ -41,6 +42,8 @@ import 
org.apache.flink.runtime.operators.shipping.ShipStrategyType;
  * or an FieldSet with the hash partitioning columns.
  */
 public class GlobalProperties implements Cloneable {
+
+       public static final Logger LOG = 
LoggerFactory.getLogger(GlobalProperties.class);
        
        private PartitioningProperty partitioning;      // the type partitioning
        
@@ -213,18 +216,6 @@ public class GlobalProperties implements Cloneable {
                }
        }
 
-       public Ordering getOrdering() {
-               return this.ordering;
-       }
-
-       public void setOrdering(Ordering ordering) {
-               this.ordering = ordering;
-       }
-
-       public void setPartitioningFields(FieldList partitioningFields) {
-               this.partitioningFields = partitioningFields;
-       }
-
        public boolean isFullyReplicated() {
                return this.partitioning == 
PartitioningProperty.FULL_REPLICATION;
        }
@@ -246,83 +237,114 @@ public class GlobalProperties implements Cloneable {
        }
 
        /**
-        * Filters these GlobalProperties by the fields that are constant or 
forwarded to another output field.
+        * Filters these GlobalProperties by the fields that are forwarded to 
the output
+        * as described by the SemanticProperties.
         *
-        * @param props The node representing the contract.
+        * @param props The semantic properties holding information about 
forwarded fields.
         * @param input The index of the input.
         * @return The filtered GlobalProperties
         */
        public GlobalProperties filterBySemanticProperties(SemanticProperties 
props, int input) {
-               // check if partitioning survives
-               FieldList forwardFields = null;
-               GlobalProperties returnProps = this;
 
                if (props == null) {
-                       return new GlobalProperties();
+                       throw new NullPointerException("SemanticProperties may 
not be null.");
                }
 
-               if (this.ordering != null) {
-                       Ordering no = new Ordering();
-                       for (int index : this.ordering.getInvolvedIndexes()) {
-                               forwardFields = props.getForwardFields(input, 
index) == null ? null: props.getForwardFields(input, index).toFieldList();
-                               if (forwardFields == null) {
-                                       returnProps = new GlobalProperties();
-                                       no = null;
-                                       break;
-                               } else {
-                                       returnProps = returnProps == this ? 
this.clone() : returnProps;
-                                       for (int i = 0; i < 
forwardFields.size(); i++) {
-                                               
no.appendOrdering(forwardFields.get(i), this.ordering.getType(index), 
this.ordering.getOrder(index));
+               GlobalProperties gp = new GlobalProperties();
+
+               // filter partitioning
+               switch(this.partitioning) {
+                       case FULL_REPLICATION:
+                               return gp;
+                       case RANGE_PARTITIONED:
+                               // check if ordering is preserved
+                               Ordering newOrdering = new Ordering();
+                               for (int i = 0; i < 
this.ordering.getInvolvedIndexes().size(); i++) {
+                                       int sourceField = 
this.ordering.getInvolvedIndexes().get(i);
+                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+
+                                       if (targetField == null || 
targetField.size() == 0) {
+                                               // partitioning is destroyed
+                                               newOrdering = null;
+                                               break;
+                                       } else {
+                                               // use any field of target 
fields for now. We should use something like field equivalence sets in the 
future.
+                                               if(targetField.size() > 1) {
+                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
+                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
+                                               }
+                                               
newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), 
this.ordering.getOrder(i));
                                        }
                                }
-                               returnProps.setOrdering(no);
-                       }
-               }
-               if (this.partitioningFields != null) {
-                       returnProps = returnProps == this ? this.clone() : 
returnProps;
-                       returnProps.setPartitioningFields(new FieldList());
+                               if(newOrdering != null) {
+                                       gp.partitioning = 
PartitioningProperty.RANGE_PARTITIONED;
+                                       gp.ordering = newOrdering;
+                                       gp.partitioningFields = 
newOrdering.getInvolvedIndexes();
+                               }
+                               break;
+                       case HASH_PARTITIONED:
+                       case ANY_PARTITIONING:
+                       case CUSTOM_PARTITIONING:
+                               FieldList newPartitioningFields = new 
FieldList();
+                               for (int sourceField : this.partitioningFields) 
{
+                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
 
-                       for (int index : this.partitioningFields) {
-                               forwardFields = props.getForwardFields(input, 
index) == null ? null: props.getForwardFields(input, index).toFieldList();
-                               if (forwardFields == null) {
-                                       returnProps = new GlobalProperties();
-                                       break;
-                               } else  {
-                                       
returnProps.setPartitioningFields(returnProps.getPartitioningFields().addFields(forwardFields));
+                                       if (targetField == null || 
targetField.size() == 0) {
+                                               newPartitioningFields = null;
+                                               break;
+                                       } else {
+                                               // use any field of target 
fields for now.  We should use something like field equivalence sets in the 
future.
+                                               if(targetField.size() > 1) {
+                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
+                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
+                                               }
+                                               newPartitioningFields = 
newPartitioningFields.addField(targetField.toArray()[0]);
+                                       }
                                }
-                       }
+                               if(newPartitioningFields != null) {
+                                       gp.partitioning = this.partitioning;
+                                       gp.partitioningFields = 
newPartitioningFields;
+                                       gp.customPartitioner = 
this.customPartitioner;
+                               }
+                               break;
+                       case FORCED_REBALANCED:
+                       case RANDOM:
+                               gp.partitioning = this.partitioning;
+                               break;
+                       default:
+                               throw new RuntimeException("Unknown 
partitioning type.");
                }
+
+               // filter unique field combinations
                if (this.uniqueFieldCombinations != null) {
-                       HashSet<FieldSet> newSet = new HashSet<FieldSet>();
-                       newSet.addAll(this.uniqueFieldCombinations);
-                       for (Iterator<FieldSet> combos = 
this.uniqueFieldCombinations.iterator(); combos.hasNext(); ){
-                               FieldSet current = combos.next();
-                               FieldSet nfs = new FieldSet();
-                               for (Integer field : current) {
-                                       if (props.getForwardFields(input, 
field) == null) {
-                                               newSet.remove(current);
-                                               nfs = null;
+                       Set<FieldSet> newUniqueFieldCombinations = new 
HashSet<FieldSet>();
+                       for (FieldSet fieldCombo : 
this.uniqueFieldCombinations) {
+                               FieldSet newFieldCombo = new FieldSet();
+                               for (Integer sourceField : fieldCombo) {
+                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+
+                                       if (targetField == null || 
targetField.size() == 0) {
+                                               newFieldCombo = null;
                                                break;
                                        } else {
-                                               nfs = 
nfs.addFields(props.getForwardFields(input, field));
+                                               // use any field of target 
fields for now.  We should use something like field equivalence sets in the 
future.
+                                               if(targetField.size() > 1) {
+                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
+                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
+                                               }
+                                               newFieldCombo = 
newFieldCombo.addField(targetField.toArray()[0]);
                                        }
                                }
-                               if (nfs != null) {
-                                       newSet.remove(current);
-                                       newSet.add(nfs);
+                               if (newFieldCombo != null) {
+                                       
newUniqueFieldCombinations.add(newFieldCombo);
                                }
                        }
-
-                       GlobalProperties gp = returnProps.clone();
-                       gp.uniqueFieldCombinations = newSet.isEmpty() ? null : 
newSet;
-                       return gp;
-               }
-
-               if (this.partitioning == PartitioningProperty.FULL_REPLICATION) 
{
-                       return new GlobalProperties();
+                       if(!newUniqueFieldCombinations.isEmpty()) {
+                               gp.uniqueFieldCombinations = 
newUniqueFieldCombinations;
+                       }
                }
 
-               return returnProps;
+               return gp;
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
index 0f60576..85736c6 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
@@ -94,11 +94,11 @@ public class InterestingProperties implements Cloneable
        public InterestingProperties filterByCodeAnnotations(OptimizerNode 
node, int input)
        {
                InterestingProperties iProps = new InterestingProperties();
-               SemanticProperties props = null;
-               if (node instanceof SingleInputNode) {
-                       props = node.getSemanticProperties();
-               } else if (node instanceof TwoInputNode) {
+               SemanticProperties props;
+               if (node instanceof SingleInputNode || node instanceof 
TwoInputNode) {
                        props = node.getSemanticProperties();
+               } else {
+                       props = new 
SemanticProperties.EmptySemanticProperties();
                }
 
                for (RequestedGlobalProperties rgp : this.globalProps) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
index b06774c..0555599 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
@@ -20,20 +20,23 @@
 package org.apache.flink.compiler.dataproperties;
 
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class represents local properties of the data. A local property is a 
property that exists
  * within the data of a single partition.
  */
 public class LocalProperties implements Cloneable {
-       
+
+       public static final Logger LOG = 
LoggerFactory.getLogger(GlobalProperties.class);
+
        public static final LocalProperties EMPTY = new LocalProperties();
        
        // 
--------------------------------------------------------------------------------------------
@@ -126,101 +129,107 @@ public class LocalProperties implements Cloneable {
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Filters these properties by what can be preserved through a user 
function's constant fields set.
-        *
-        * @param props The optimizer node that potentially modifies the 
properties.
-        * @param input The input of the node which is relevant.
+        * Filters these LocalProperties by the fields that are forwarded to 
the output
+        * as described by the SemanticProperties.
         *
+        * @param props The semantic properties holding information about 
forwarded fields.
+        * @param input The index of the input.
         * @return The filtered LocalProperties
         */
        public LocalProperties filterBySemanticProperties(SemanticProperties 
props, int input) {
-               // check, whether the local order is preserved
-               Ordering no = this.ordering;
-               FieldList ngf = this.groupedFields;
-               Set<FieldSet> nuf = this.uniqueFields;
-               FieldList forwardList = null;
 
                if (props == null) {
-                       return new LocalProperties();
+                       throw new NullPointerException("SemanticProperties may 
not be null.");
                }
 
+               LocalProperties returnProps = new LocalProperties();
+
+               // check if sorting is preserved
                if (this.ordering != null) {
-                       no = new Ordering();
-                       final FieldList involvedIndexes = 
this.ordering.getInvolvedIndexes();
-                       for (int i = 0; i < involvedIndexes.size(); i++) {
-                               forwardList = props.getForwardFields(input, 
involvedIndexes.get(i)) == null ? null : props.getForwardFields(input, 
involvedIndexes.get(i)).toFieldList();
+                       Ordering newOrdering = new Ordering();
 
-                               if (forwardList == null) {
-                                       no = null;
-                                       ngf = null;
-                                       /*if (i == 0) {
-                                               no = null;
-                                               ngf = null;
+                       for (int i = 0; i < 
this.ordering.getInvolvedIndexes().size(); i++) {
+                               int sourceField = 
this.ordering.getInvolvedIndexes().get(i);
+                               FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+                               if (targetField == null || targetField.size() 
== 0) {
+                                       if (i == 0) {
+                                               // order fully destroyed
+                                               newOrdering = null;
+                                               break;
                                        } else {
-                                               no = 
this.ordering.createNewOrderingUpToIndex(i);
-                                               ngf = no.getInvolvedIndexes();
-                                       }*/
-                                       break;
+                                               // order partially preserved
+                                               break;
+                                       }
                                } else {
-                                       no.appendOrdering(forwardList.get(0), 
this.ordering.getType(i), this.ordering.getOrder(i));
-                                       ngf = no.getInvolvedIndexes();
+                                       // use any field of target fields for 
now.  We should use something like field equivalence sets in the future.
+                                       if(targetField.size() > 1) {
+                                               LOG.warn("Found that a field is 
forwarded to more than one target field in " +
+                                                               "semantic 
forwarded field information. Will only use the field with the lowest index.");
+                                       }
+                                       
newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), 
this.ordering.getOrder(i));
                                }
                        }
+
+                       returnProps.ordering = newOrdering;
+                       if (newOrdering != null) {
+                               returnProps.groupedFields = 
newOrdering.getInvolvedIndexes();
+                       } else {
+                               returnProps.groupedFields = null;
+                       }
                }
+               // check if grouping is preserved
                else if (this.groupedFields != null) {
-                       // check, whether the local key grouping is preserved
-                       for (Integer index : this.groupedFields) {
-                               forwardList = props.getForwardFields(input, 
index) == null ? null : props.getForwardFields(input, index).toFieldList();
-                               if (forwardList == null) {
-                                       ngf = null;
+                       FieldList newGroupedFields = new FieldList();
+
+                       for (Integer sourceField : this.groupedFields) {
+                               FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+                               if (targetField == null || targetField.size() 
== 0) {
+                                       newGroupedFields = null;
                                        break;
-                               } else if (!forwardList.contains(index)) {
-                                       FieldList grouped = new FieldList();
-                                       for (Integer value : ngf.toFieldList()) 
{
-                                               if (value.intValue() == index) {
-                                                       grouped = 
grouped.addFields(forwardList);
-                                               } else {
-                                                       grouped = 
grouped.addField(value);
-                                               }
+                               } else {
+                                       // use any field of target fields for 
now.  We should use something like field equivalence sets in the future.
+                                       if(targetField.size() > 1) {
+                                               LOG.warn("Found that a field is 
forwarded to more than one target field in " +
+                                                               "semantic 
forwarded field information. Will only use the field with the lowest index.");
                                        }
-                                       ngf = grouped;
+                                       newGroupedFields = 
newGroupedFields.addField(targetField.toArray()[0]);
                                }
                        }
+                       returnProps.groupedFields = newGroupedFields;
                }
 
                if (this.uniqueFields != null) {
-                       HashSet<FieldSet> newSet = new HashSet<FieldSet>();
-                       newSet.addAll(this.uniqueFields);
-                       for (Iterator<FieldSet> combos = 
this.uniqueFields.iterator(); combos.hasNext(); ){
-                               FieldSet current = combos.next();
-                               FieldSet nfs = new FieldSet();
-                               for (Integer field : current) {
-                                       if (props.getForwardFields(input, 
field) == null) {
-                                               newSet.remove(current);
-                                               nfs = null;
+                       Set<FieldSet> newUniqueFields = new HashSet<FieldSet>();
+                       for (FieldSet fields : this.uniqueFields) {
+                               FieldSet newFields = new FieldSet();
+                               for (Integer sourceField : fields) {
+                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+
+                                       if (targetField == null || 
targetField.size() == 0) {
+                                               newFields = null;
                                                break;
                                        } else {
-                                               nfs = 
nfs.addFields(props.getForwardFields(input, field));
+                                               // use any field of target 
fields for now.  We should use something like field equivalence sets in the 
future.
+                                               if(targetField.size() > 1) {
+                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
+                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
+                                               }
+                                               newFields = 
newFields.addField(targetField.toArray()[0]);
                                        }
                                }
-                               if (nfs != null) {
-                                       newSet.remove(current);
-                                       newSet.add(nfs);
+                               if (newFields != null) {
+                                       newUniqueFields.add(newFields);
                                }
                        }
 
-                       nuf = newSet.isEmpty() ? null : newSet;
+                       if (!newUniqueFields.isEmpty()) {
+                               returnProps.uniqueFields = newUniqueFields;
+                       } else {
+                               returnProps.uniqueFields = null;
+                       }
                }
 
-               if (no == this.ordering && ngf == this.groupedFields && nuf == 
this.uniqueFields) {
-                       return this;
-               } else {
-                       LocalProperties lp = new LocalProperties();
-                       lp.ordering = no;
-                       lp.groupedFields = ngf;
-                       lp.uniqueFields = nuf;
-                       return lp;
-               }
+               return returnProps;
        }
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
index 1320f82..de56c37 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
@@ -189,73 +188,65 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                this.customPartitioner = null;
        }
 
-       public void setPartitioningFields(FieldSet partitioned) {
-               this.partitioningFields = partitioned;
-       }
-
-       public void setPartitioningFields(FieldSet fields, PartitioningProperty 
partitioning) {
-               this.partitioningFields = fields;
-               this.partitioning = partitioning;
-       }
-
-       public void setOrdering(Ordering newOrdering) {
-               this.ordering = newOrdering;
-       }
-
        /**
-        * Filters these properties by what can be preserved by the given node 
when propagated down
+        * Filters these properties by what can be preserved by the given 
SemanticProperties when propagated down
         * to the given input.
         *
-        * @param props The node representing the contract.
-        * @param input The index of the input.
+        * @param props The SemanticProperties which define which fields are 
preserved.
+        * @param input The index of the operator's input.
         * @return The filtered RequestedGlobalProperties
         */
        public RequestedGlobalProperties 
filterBySemanticProperties(SemanticProperties props, int input) {
-               FieldList sourceList;
-               RequestedGlobalProperties returnProps = null;
+               RequestedGlobalProperties returnProps = new 
RequestedGlobalProperties();
 
+               // no semantic properties available. All global properties are 
filtered.
                if (props == null) {
-                       return null;
+                       throw new NullPointerException("SemanticProperties may 
not be null.");
                }
 
-               // check if partitioning survives
-               if (this.ordering != null) {
-                       Ordering no = new Ordering();
-                       returnProps = new RequestedGlobalProperties();
-                       returnProps.setPartitioningFields(new FieldSet(), 
this.partitioning);
+               RequestedGlobalProperties rgProp = new 
RequestedGlobalProperties();
 
-                       for (int index = 0; index < 
this.ordering.getInvolvedIndexes().size(); index++) {
-                               int value = 
this.ordering.getInvolvedIndexes().get(index);
-                               sourceList = props.getSourceField(input, value) 
== null ? null : props.getSourceField(input, value).toFieldList();
-                               if (sourceList != null) {
-                                       no.appendOrdering(sourceList.get(0), 
this.ordering.getType(index), this.ordering.getOrder(index));
-                               } else {
-                                       return null;
+               switch(this.partitioning) {
+                       case FULL_REPLICATION:
+                       case FORCED_REBALANCED:
+                       case CUSTOM_PARTITIONING:
+                       case RANDOM:
+                               // make sure that certain properties are not 
pushed down
+                               return null;
+                       case HASH_PARTITIONED:
+                       case ANY_PARTITIONING:
+                               FieldSet newFields = new FieldSet();
+                               for (Integer targetField : 
this.partitioningFields) {
+                                       int sourceField = 
props.getForwardingSourceField(input, targetField);
+                                       if (sourceField >= 0) {
+                                               newFields = 
newFields.addField(sourceField);
+                                       } else {
+                                               // partial partitionings are 
not preserved to avoid skewed partitioning
+                                               return null;
+                                       }
                                }
-                       }
-                       returnProps.setOrdering(no);
-               } else if (this.partitioningFields != null) {
-                       returnProps = new RequestedGlobalProperties();
-                       returnProps.setPartitioningFields(new FieldSet(), 
this.partitioning);
-                       for (Integer index : this.partitioningFields) {
-                               sourceList = props.getSourceField(input, index) 
== null ? null : props.getSourceField(input, index).toFieldList();
-                               if (sourceList != null) {
-                                       
returnProps.setPartitioningFields(returnProps.getPartitionedFields().addFields(sourceList),
 this.partitioning);
-                               } else {
-                                       return null;
+                               rgProp.partitioning = this.partitioning;
+                               rgProp.partitioningFields = newFields;
+                               return rgProp;
+                       case RANGE_PARTITIONED:
+                               // range partitioning
+                               Ordering newOrdering = new Ordering();
+                               for (int i = 0; i < 
this.ordering.getInvolvedIndexes().size(); i++) {
+                                       int value = 
this.ordering.getInvolvedIndexes().get(i);
+                                       int sourceField = 
props.getForwardingSourceField(input, value);
+                                       if (sourceField >= 0) {
+                                               
newOrdering.appendOrdering(sourceField, this.ordering.getType(i), 
this.ordering.getOrder(i));
+                                       } else {
+                                               return null;
+                                       }
                                }
-                       }
-               }
-               // make sure that certain properties are not pushed down
-               final PartitioningProperty partitioning = this.partitioning;
-               if (partitioning == PartitioningProperty.FULL_REPLICATION ||
-                               partitioning == 
PartitioningProperty.FORCED_REBALANCED ||
-                               partitioning == 
PartitioningProperty.CUSTOM_PARTITIONING)
-               {
-                       return null;
+                               rgProp.partitioning = this.partitioning;
+                               rgProp.ordering = newOrdering;
+                               rgProp.dataDistribution = this.dataDistribution;
+                               return rgProp;
+                       default:
+                               throw new RuntimeException("Unknown 
partitioning type encountered.");
                }
-
-               return returnProps;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
index a5bf118..1f69959 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.util.Utils;
@@ -135,50 +134,50 @@ public class RequestedLocalProperties implements 
Cloneable {
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Filters these properties by what can be preserved through a user 
function's constant fields set.
-        * Since interesting properties are filtered top-down, anything that 
partially destroys the ordering
-        * makes the properties uninteresting.
-        *
-        * @param props The optimizer node that potentially modifies the 
properties.
-        * @param input The input of the node which is relevant.
+        * Filters these properties by what can be preserved by the given 
SemanticProperties when propagated down
+        * to the given input.
         *
+        * @param props The SemanticProperties which define which fields are 
preserved.
+        * @param input The index of the operator's input.
         * @return The filtered RequestedLocalProperties
         */
        public RequestedLocalProperties 
filterBySemanticProperties(SemanticProperties props, int input) {
-               FieldList sourceList;
-               RequestedLocalProperties returnProps = this;
 
+               // no semantic properties, all local properties are filtered
                if (props == null) {
-                       return null;
+                       throw new NullPointerException("SemanticProperties may 
not be null.");
                }
 
                if (this.ordering != null) {
-                       Ordering no = new Ordering();
-                       returnProps = this.clone();
-                       for (int index = 0; index < 
this.ordering.getInvolvedIndexes().size(); index++) {
-                               int value = 
this.ordering.getInvolvedIndexes().get(index);
-                               sourceList = props.getSourceField(input, value) 
== null ? null : props.getSourceField(input, value).toFieldList();
-                               if (sourceList != null) {
-                                       no.appendOrdering(sourceList.get(0), 
this.ordering.getType(index), this.ordering.getOrder(index));
+                       Ordering newOrdering = new Ordering();
+
+                       for (int i = 0; i < 
this.ordering.getInvolvedIndexes().size(); i++) {
+                               int targetField = 
this.ordering.getInvolvedIndexes().get(i);
+                               int sourceField = 
props.getForwardingSourceField(input, targetField);
+                               if (sourceField >= 0) {
+                                       newOrdering.appendOrdering(sourceField, 
this.ordering.getType(i), this.ordering.getOrder(i));
                                } else {
                                        return null;
                                }
                        }
-                       returnProps.setOrdering(no);
+                       return new RequestedLocalProperties(newOrdering);
+
                } else if (this.groupedFields != null) {
-                       returnProps =  this.clone();
-                       returnProps.setGroupedFields(new FieldList());
+                       FieldSet newGrouping = new FieldSet();
+
                        // check, whether the local key grouping is preserved
-                       for (Integer index : this.groupedFields) {
-                               sourceList = props.getSourceField(input, index) 
== null ? null : props.getSourceField(input, index).toFieldList();
-                               if (sourceList != null) {
-                                       
returnProps.setGroupedFields(returnProps.getGroupedFields().addFields(sourceList));
+                       for (Integer targetField : this.groupedFields) {
+                               int sourceField = 
props.getForwardingSourceField(input, targetField);
+                               if (sourceField >= 0) {
+                                       newGrouping = 
newGrouping.addField(sourceField);
                                } else {
                                        return null;
                                }
                        }
+                       return new RequestedLocalProperties(newGrouping);
+               } else {
+                       return null;
                }
-               return returnProps;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
index ab083ef..5da215f 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
@@ -525,7 +525,7 @@ public abstract class GenericFlatTypePostPass<X, T extends 
AbstractSchema<X>> im
                                Integer pos = entry.getKey();
                                SemanticProperties sprops = 
optNode.getSemanticProperties();
 
-                               if (sprops != null && 
sprops.getForwardFields(input, pos) != null && 
sprops.getForwardFields(input,pos).contains(pos)) {
+                               if (sprops != null && 
sprops.getForwardingTargetFields(input, pos) != null && 
sprops.getForwardingTargetFields(input, pos).contains(pos)) {
                                        targetSchema.addType(pos, 
entry.getValue());
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java 
b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
index 93a1fc5..c3a4c3a 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
@@ -250,7 +250,7 @@ public class DOPChangeTest extends CompilerTestBase {
                
                // submit the plan to the compiler
                OptimizedPlan oPlan = compileNoStats(plan);
-               
+
                // check the optimized Plan
                // when reducer 1 distributes its data across the instances of 
map2, it needs to employ a local hash method,
                // because map2 has twice as many instances and key/value pairs 
with the same key need to be processed by the same
@@ -258,10 +258,7 @@ public class DOPChangeTest extends CompilerTestBase {
                SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
                SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
 
-               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
-
-               Assert.assertEquals("The Reduce 2 Node has an invalid local 
strategy.", LocalStrategy.NONE, red2Node.getInput().getLocalStrategy());
-               Assert.assertEquals("The Map 2 Node has an invalid local 
strategy.", LocalStrategy.SORT, map2Node.getInput().getLocalStrategy());
+               Assert.assertEquals("The Reduce 2 Node has an invalid local 
strategy.", LocalStrategy.SORT, red2Node.getInput().getLocalStrategy());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 87af91b..9445198 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.compiler.plan.BulkIterationPlanNode;
@@ -376,14 +376,14 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
                }
        }
        
-       @ConstantFields("0")
+       @ForwardedFields("0")
        public static final class Reduce101 extends 
RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
                
                @Override
                public void reduce(Iterable<Tuple1<Long>> values, 
Collector<Tuple1<Long>> out) {}
        }
        
-       @ConstantFields("0")
+       @ForwardedFields("0")
        public static final class DuplicateValue extends 
RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
 
                @Override

Reply via email to