[FLINK-1328] Integrated forwarded Fields into the optimizer (incomplete)

This closes #83


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78f41e9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78f41e9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78f41e9c

Branch: refs/heads/master
Commit: 78f41e9cae597deeafdec934d6052ad72db6946f
Parents: 82c4200
Author: sebastian kunert <[email protected]>
Authored: Sun Aug 31 19:22:56 2014 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Wed Jan 28 01:39:00 2015 +0100

----------------------------------------------------------------------
 .../dag/AbstractPartialSolutionNode.java        |   5 +-
 .../flink/compiler/dag/BinaryUnionNode.java     |   8 +-
 .../flink/compiler/dag/BulkIterationNode.java   |   7 +-
 .../apache/flink/compiler/dag/DataSinkNode.java |   8 +-
 .../flink/compiler/dag/DataSourceNode.java      |   5 +-
 .../apache/flink/compiler/dag/FilterNode.java   |  12 +-
 .../flink/compiler/dag/OptimizerNode.java       |  21 +-
 .../flink/compiler/dag/SingleInputNode.java     |  27 +-
 .../apache/flink/compiler/dag/TwoInputNode.java |  44 +-
 .../flink/compiler/dag/UnaryOperatorNode.java   |  14 +-
 .../compiler/dag/WorksetIterationNode.java      |   9 +-
 .../dataproperties/GlobalProperties.java        |  98 +-
 .../dataproperties/InterestingProperties.java   |  17 +-
 .../dataproperties/LocalProperties.java         |  80 +-
 .../RequestedGlobalProperties.java              |  58 +-
 .../RequestedLocalProperties.java               |  40 +-
 .../postpass/GenericFlatTypePostPass.java       |   5 +-
 .../apache/flink/compiler/DOPChangeTest.java    |   7 +-
 .../compiler/SemanticPropOptimizerTest.java     | 941 +++++++++++++++++++
 .../operators/DualInputSemanticProperties.java  |  83 +-
 .../flink/api/common/operators/Ordering.java    |   2 +-
 .../common/operators/SemanticProperties.java    |  17 +-
 .../SingleInputSemanticProperties.java          |  47 +-
 .../java/graph/ConnectedComponents.java         |   6 +-
 .../record/functions/FunctionAnnotation.java    | 108 ++-
 25 files changed, 1460 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
index 81199a7..c54076d 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.costs.CostEstimator;
 import org.apache.flink.compiler.plan.PlanNode;
@@ -86,8 +87,8 @@ public abstract class AbstractPartialSolutionNode extends 
OptimizerNode {
        }
 
        @Override
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               return false;
+       public SemanticProperties getSemanticProperties() {
+               return null;
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
index 9003c92..5d805a9 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.Union;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
@@ -252,8 +254,10 @@ public class BinaryUnionNode extends TwoInputNode {
        protected void readStubAnnotations() {}
 
        @Override
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               return true;
+       public SemanticProperties getSemanticProperties() {
+               DualInputSemanticProperties sprops = new 
DualInputSemanticProperties();
+               sprops.setAllFieldsConstant(true);
+               return sprops;
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
index d500925..18ac4c2 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.compiler.CompilerException;
@@ -182,10 +183,10 @@ public class BulkIterationNode extends SingleInputNode 
implements IterationNode
        public String getName() {
                return "Bulk Iteration";
        }
-       
+
        @Override
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               return false;
+       public SemanticProperties getSemanticProperties() {
+               return null;
        }
        
        protected void readStubAnnotations() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
index 28a4d7c..2d4fb30 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.costs.CostEstimator;
@@ -230,9 +231,10 @@ public class DataSinkNode extends OptimizerNode {
        // 
--------------------------------------------------------------------------------------------
        //                                   Function Annotation Handling
        // 
--------------------------------------------------------------------------------------------
-       
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               return false;
+
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return null;
        }
                
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
index 39c09a1..752a763 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.io.NonParallelInput;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.costs.CostEstimator;
@@ -193,8 +194,8 @@ public class DataSourceNode extends OptimizerNode {
        }
 
        @Override
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               return false;
+       public SemanticProperties getSemanticProperties() {
+               return null;
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
index df304b1..140734c 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
@@ -21,6 +21,8 @@ package org.apache.flink.compiler.dag;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.FilterOperatorBase;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.operators.FilterDescriptor;
@@ -47,10 +49,14 @@ public class FilterNode extends SingleInputNode {
        public String getName() {
                return "Filter";
        }
-       
+
        @Override
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               return true;
+       public SemanticProperties getSemanticProperties() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               sprops.setAllFieldsConstant(true);
+
+               return sprops;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
index 89cc354..75becf8 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import org.apache.flink.api.common.operators.AbstractUdfOperator;
 import org.apache.flink.api.common.operators.CompilerHints;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
@@ -268,16 +269,8 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         */
        @Override
        public abstract void accept(Visitor<OptimizerNode> visitor);
-       
-       /**
-        * Checks whether a field is modified by the user code or whether it is 
kept unchanged.
-        * 
-        * @param input The input number.
-        * @param fieldNumber The position of the field.
-        * 
-        * @return True if the field is not changed by the user function, false 
otherwise.
-        */
-       public abstract boolean isFieldConstant(int input, int fieldNumber);
+
+       public abstract SemanticProperties getSemanticProperties();
 
        // 
------------------------------------------------------------------------
        //                          Getters / Setters
@@ -688,8 +681,12 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                                        return null;
                                }
                                for (int keyColumn : keyColumns) {
-                                       if (!isFieldConstant(input, keyColumn)) 
{
-                                               return null;    
+                                       FieldSet fs = getSemanticProperties() 
== null ? null : getSemanticProperties().getForwardFields(input, keyColumn);
+
+                                       if (fs == null) {
+                                               return null;
+                                       } else if (!fs.contains(keyColumn)) {
+                                               return null;
                                        }
                                }
                                return keyColumns;

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
index 730c1bb..e7e82fc 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
@@ -31,8 +31,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.PactCompiler;
@@ -144,22 +144,8 @@ public abstract class SingleInputNode extends 
OptimizerNode {
        
 
        @Override
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               if (input != 0) {
-                       throw new IndexOutOfBoundsException();
-               }
-               
-               SingleInputOperator<?, ?, ?> c = getPactContract();
-               SingleInputSemanticProperties semanticProperties = 
c.getSemanticProperties();
-               
-               if (semanticProperties != null) {
-                       FieldSet fs;
-                       if ((fs = 
semanticProperties.getForwardedField(fieldNumber)) != null) {
-                               return fs.contains(fieldNumber);
-                       }
-               }
-               
-               return false;
+       public SemanticProperties getSemanticProperties() {
+               return ((SingleInputOperator<?,?,?>) 
getPactContract()).getSemanticProperties();
        }
        
 
@@ -444,10 +430,11 @@ public abstract class SingleInputNode extends 
OptimizerNode {
                        LocalProperties lProps = 
in.getLocalProperties().clone();
                        gProps = dps.computeGlobalProperties(gProps);
                        lProps = dps.computeLocalProperties(lProps);
-                       
+
+                       SemanticProperties props = this.getSemanticProperties();
                        // filter by the user code field copies
-                       gProps = gProps.filterByNodesConstantSet(this, 0);
-                       lProps = lProps.filterByNodesConstantSet(this, 0);
+                       gProps = gProps.filterBySemanticProperties(props, 0);
+                       lProps = lProps.filterBySemanticProperties(props, 0);
                        
                        // apply
                        node.initProperties(gProps, lProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
index 32f0519..5e9a980 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
@@ -31,10 +31,9 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.api.common.operators.DualInputOperator;
-import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.costs.CostEstimator;
@@ -554,13 +553,14 @@ public abstract class TwoInputNode extends OptimizerNode {
                        
                        DualInputPlanNode node = operator.instantiate(in1, in2, 
this);
                        node.setBroadcastInputs(broadcastChannelsCombination);
-                       
-                       GlobalProperties gp1 = 
in1.getGlobalProperties().clone().filterByNodesConstantSet(this, 0);
-                       GlobalProperties gp2 = 
in2.getGlobalProperties().clone().filterByNodesConstantSet(this, 1);
+
+                       SemanticProperties props = this.getSemanticProperties();
+                       GlobalProperties gp1 = 
in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0);
+                       GlobalProperties gp2 = 
in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1);
                        GlobalProperties combined = 
operator.computeGlobalProperties(gp1, gp2);
 
-                       LocalProperties lp1 = 
in1.getLocalProperties().clone().filterByNodesConstantSet(this, 0);
-                       LocalProperties lp2 = 
in2.getLocalProperties().clone().filterByNodesConstantSet(this, 1);
+                       LocalProperties lp1 = 
in1.getLocalProperties().clone().filterBySemanticProperties(props, 0);
+                       LocalProperties lp2 = 
in2.getLocalProperties().clone().filterBySemanticProperties(props, 1);
                        LocalProperties locals = 
operator.computeLocalProperties(lp1, lp2);
                        
                        node.initProperties(combined, locals);
@@ -690,37 +690,11 @@ public abstract class TwoInputNode extends OptimizerNode {
                }
        }
 
-       
        @Override
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               DualInputOperator<?, ?, ?, ?> c = getPactContract();
-               DualInputSemanticProperties semanticProperties = 
c.getSemanticProperties();
-               
-               switch(input) {
-               case 0:
-                       if (semanticProperties != null) {
-                               FieldSet fs;
-                               if ((fs = 
semanticProperties.getForwardedField1(fieldNumber)) != null) {
-                                       return fs.contains(fieldNumber);
-                               }
-                       }
-                       break;
-               case 1:
-                       if(semanticProperties != null) {
-                               FieldSet fs;
-                               if ((fs = 
semanticProperties.getForwardedField2(fieldNumber)) != null) {
-                                       return fs.contains(fieldNumber);
-                               }
-                       }
-                       break;
-               default:
-                       throw new IndexOutOfBoundsException();
-               }
-               
-               return false;
+       public SemanticProperties getSemanticProperties() {
+               return ((DualInputOperator<?, ?, ?, ?>) 
getPactContract()).getSemanticProperties();
        }
        
-       
        // 
--------------------------------------------------------------------------------------------
        //                                     Miscellaneous
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
index 432f904..aaf0a10 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
@@ -21,6 +21,8 @@ package org.apache.flink.compiler.dag;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
@@ -54,12 +56,12 @@ public class UnaryOperatorNode extends SingleInputNode {
        public String getName() {
                return this.name;
        }
-       
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               if (input != 0) {
-                       throw new IndexOutOfBoundsException();
-               }
-               return true;
+
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               sprops.setAllFieldsConstant(true);
+               return sprops;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
index ae6bc6b..95fc066 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
@@ -218,12 +219,12 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
        public String getName() {
                return "Workset Iteration";
        }
-       
+
        @Override
-       public boolean isFieldConstant(int input, int fieldNumber) {
-               return false;
+       public SemanticProperties getSemanticProperties() {
+               return null;
        }
-       
+
        protected void readStubAnnotations() {}
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
index 7dedc53..4fe632a 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
@@ -25,10 +25,10 @@ import java.util.Set;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
-import org.apache.flink.compiler.dag.OptimizerNode;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.util.Utils;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -212,7 +212,19 @@ public class GlobalProperties implements Cloneable {
                        return false;
                }
        }
-       
+
+       public Ordering getOrdering() {
+               return this.ordering;
+       }
+
+       public void setOrdering(Ordering ordering) {
+               this.ordering = ordering;
+       }
+
+       public void setPartitioningFields(FieldList partitioningFields) {
+               this.partitioningFields = partitioningFields;
+       }
+
        public boolean isFullyReplicated() {
                return this.partitioning == 
PartitioningProperty.FULL_REPLICATION;
        }
@@ -234,56 +246,86 @@ public class GlobalProperties implements Cloneable {
        }
 
        /**
-        * Filters these properties by what can be preserved through the given 
output contract.
-        * 
-        * @param node The optimizer node.
-        * @param input The input of the node to filter against.
-        * @return The adjusted global properties.
+        * Filters these GlobalProperties by the fields that are constant or 
forwarded to another output field.
+        *
+        * @param props The node representing the contract.
+        * @param input The index of the input.
+        * @return The filtered GlobalProperties
         */
-       public GlobalProperties filterByNodesConstantSet(OptimizerNode node, 
int input) {
+       public GlobalProperties filterBySemanticProperties(SemanticProperties 
props, int input) {
                // check if partitioning survives
+               FieldList forwardFields = null;
+               GlobalProperties returnProps = this;
+
+               if (props == null) {
+                       return new GlobalProperties();
+               }
+
                if (this.ordering != null) {
-                       for (int col : this.ordering.getInvolvedIndexes()) {
-                               if (!node.isFieldConstant(input, col)) {
-                                       return new GlobalProperties();
+                       Ordering no = new Ordering();
+                       for (int index : this.ordering.getInvolvedIndexes()) {
+                               forwardFields = props.getForwardFields(input, 
index) == null ? null: props.getForwardFields(input, index).toFieldList();
+                               if (forwardFields == null) {
+                                       returnProps = new GlobalProperties();
+                                       no = null;
+                                       break;
+                               } else {
+                                       returnProps = returnProps == this ? 
this.clone() : returnProps;
+                                       for (int i = 0; i < 
forwardFields.size(); i++) {
+                                               
no.appendOrdering(forwardFields.get(i), this.ordering.getType(index), 
this.ordering.getOrder(index));
+                                       }
                                }
+                               returnProps.setOrdering(no);
                        }
                }
                if (this.partitioningFields != null) {
-                       for (int colIndex : this.partitioningFields) {
-                               if (!node.isFieldConstant(input, colIndex)) {
-                                       return new GlobalProperties();
+                       returnProps = returnProps == this ? this.clone() : 
returnProps;
+                       returnProps.setPartitioningFields(new FieldList());
+
+                       for (int index : this.partitioningFields) {
+                               forwardFields = props.getForwardFields(input, 
index) == null ? null: props.getForwardFields(input, index).toFieldList();
+                               if (forwardFields == null) {
+                                       returnProps = new GlobalProperties();
+                                       break;
+                               } else  {
+                                       
returnProps.setPartitioningFields(returnProps.getPartitioningFields().addFields(forwardFields));
                                }
                        }
                }
                if (this.uniqueFieldCombinations != null) {
                        HashSet<FieldSet> newSet = new HashSet<FieldSet>();
                        newSet.addAll(this.uniqueFieldCombinations);
-                       
-                       for (Iterator<FieldSet> combos = newSet.iterator(); 
combos.hasNext(); ){
+                       for (Iterator<FieldSet> combos = 
this.uniqueFieldCombinations.iterator(); combos.hasNext(); ){
                                FieldSet current = combos.next();
+                               FieldSet nfs = new FieldSet();
                                for (Integer field : current) {
-                                       if (!node.isFieldConstant(input, 
field)) {
-                                               combos.remove();
+                                       if (props.getForwardFields(input, 
field) == null) {
+                                               newSet.remove(current);
+                                               nfs = null;
                                                break;
+                                       } else {
+                                               nfs = 
nfs.addFields(props.getForwardFields(input, field));
                                        }
                                }
+                               if (nfs != null) {
+                                       newSet.remove(current);
+                                       newSet.add(nfs);
+                               }
                        }
-                       
-                       if (newSet.size() != 
this.uniqueFieldCombinations.size()) {
-                               GlobalProperties gp = clone();
-                               gp.uniqueFieldCombinations = newSet.isEmpty() ? 
null : newSet;
-                               return gp;
-                       }
+
+                       GlobalProperties gp = returnProps.clone();
+                       gp.uniqueFieldCombinations = newSet.isEmpty() ? null : 
newSet;
+                       return gp;
                }
-               
+
                if (this.partitioning == PartitioningProperty.FULL_REPLICATION) 
{
                        return new GlobalProperties();
                }
-               
-               return this;
+
+               return returnProps;
        }
-       
+
+
        public void parameterizeChannel(Channel channel, boolean 
globalDopChange) {
                switch (this.partitioning) {
                        case RANDOM:

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
index 9686484..0f60576 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
@@ -23,7 +23,10 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.compiler.dag.OptimizerNode;
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dag.TwoInputNode;
 
 /**
  * The interesting properties that a node in the optimizer plan hands to its 
predecessors. It has the
@@ -87,19 +90,25 @@ public class InterestingProperties implements Cloneable
        public Set<RequestedGlobalProperties> getGlobalProperties() {
                return this.globalProps;
        }
-       
+
        public InterestingProperties filterByCodeAnnotations(OptimizerNode 
node, int input)
        {
                InterestingProperties iProps = new InterestingProperties();
-               
+               SemanticProperties props = null;
+               if (node instanceof SingleInputNode) {
+                       props = node.getSemanticProperties();
+               } else if (node instanceof TwoInputNode) {
+                       props = node.getSemanticProperties();
+               }
+
                for (RequestedGlobalProperties rgp : this.globalProps) {
-                       RequestedGlobalProperties filtered = 
rgp.filterByNodesConstantSet(node, input);
+                       RequestedGlobalProperties filtered = 
rgp.filterBySemanticProperties(props, input);
                        if (filtered != null && !filtered.isTrivial()) {
                                iProps.addGlobalProperties(filtered);
                        }
                }
                for (RequestedLocalProperties rlp : this.localProps) {
-                       RequestedLocalProperties filtered = 
rlp.filterByNodesConstantSet(node, input);
+                       RequestedLocalProperties filtered = 
rlp.filterBySemanticProperties(props, input);
                        if (filtered != null && !filtered.isTrivial()) {
                                iProps.addLocalProperties(filtered);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
index 3c66e56..b06774c 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
@@ -20,12 +20,13 @@
 package org.apache.flink.compiler.dataproperties;
 
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.compiler.dag.OptimizerNode;
 
 /**
  * This class represents local properties of the data. A local property is a 
property that exists
@@ -126,57 +127,91 @@ public class LocalProperties implements Cloneable {
 
        /**
         * Filters these properties by what can be preserved through a user 
function's constant fields set.
-        * 
-        * @param node The optimizer node that potentially modifies the 
properties.
+        *
+        * @param props The optimizer node that potentially modifies the 
properties.
         * @param input The input of the node which is relevant.
-        * 
-        * @return True, if the resulting properties are non trivial.
+        *
+        * @return The filtered LocalProperties
         */
-       public LocalProperties filterByNodesConstantSet(OptimizerNode node, int 
input) {
+       public LocalProperties filterBySemanticProperties(SemanticProperties 
props, int input) {
                // check, whether the local order is preserved
                Ordering no = this.ordering;
                FieldList ngf = this.groupedFields;
                Set<FieldSet> nuf = this.uniqueFields;
-               
+               FieldList forwardList = null;
+
+               if (props == null) {
+                       return new LocalProperties();
+               }
+
                if (this.ordering != null) {
+                       no = new Ordering();
                        final FieldList involvedIndexes = 
this.ordering.getInvolvedIndexes();
                        for (int i = 0; i < involvedIndexes.size(); i++) {
-                               if (!node.isFieldConstant(input, 
involvedIndexes.get(i))) {
-                                       if (i == 0) {
+                               forwardList = props.getForwardFields(input, 
involvedIndexes.get(i)) == null ? null : props.getForwardFields(input, 
involvedIndexes.get(i)).toFieldList();
+
+                               if (forwardList == null) {
+                                       no = null;
+                                       ngf = null;
+                                       /*if (i == 0) {
                                                no = null;
                                                ngf = null;
                                        } else {
                                                no = 
this.ordering.createNewOrderingUpToIndex(i);
                                                ngf = no.getInvolvedIndexes();
-                                       }
+                                       }*/
                                        break;
+                               } else {
+                                       no.appendOrdering(forwardList.get(0), 
this.ordering.getType(i), this.ordering.getOrder(i));
+                                       ngf = no.getInvolvedIndexes();
                                }
                        }
                }
                else if (this.groupedFields != null) {
                        // check, whether the local key grouping is preserved
                        for (Integer index : this.groupedFields) {
-                               if (!node.isFieldConstant(input, index)) {
+                               forwardList = props.getForwardFields(input, 
index) == null ? null : props.getForwardFields(input, index).toFieldList();
+                               if (forwardList == null) {
                                        ngf = null;
+                                       break;
+                               } else if (!forwardList.contains(index)) {
+                                       FieldList grouped = new FieldList();
+                                       for (Integer value : ngf.toFieldList()) 
{
+                                               if (value.intValue() == index) {
+                                                       grouped = 
grouped.addFields(forwardList);
+                                               } else {
+                                                       grouped = 
grouped.addField(value);
+                                               }
+                                       }
+                                       ngf = grouped;
                                }
                        }
                }
-               
-               if (this.uniqueFields != null && this.uniqueFields.size() > 0) {
-                       Set<FieldSet> s = new 
HashSet<FieldSet>(this.uniqueFields);
-                       for (FieldSet fields : this.uniqueFields) {
-                               for (Integer index : fields) {
-                                       if (!node.isFieldConstant(input, 
index)) {
-                                               s.remove(fields);
+
+               if (this.uniqueFields != null) {
+                       HashSet<FieldSet> newSet = new HashSet<FieldSet>();
+                       newSet.addAll(this.uniqueFields);
+                       for (Iterator<FieldSet> combos = 
this.uniqueFields.iterator(); combos.hasNext(); ){
+                               FieldSet current = combos.next();
+                               FieldSet nfs = new FieldSet();
+                               for (Integer field : current) {
+                                       if (props.getForwardFields(input, 
field) == null) {
+                                               newSet.remove(current);
+                                               nfs = null;
                                                break;
+                                       } else {
+                                               nfs = 
nfs.addFields(props.getForwardFields(input, field));
                                        }
                                }
+                               if (nfs != null) {
+                                       newSet.remove(current);
+                                       newSet.add(nfs);
+                               }
                        }
-                       if (s.size() != this.uniqueFields.size()) {
-                               nuf = s;
-                       }
+
+                       nuf = newSet.isEmpty() ? null : newSet;
                }
-               
+
                if (no == this.ordering && ngf == this.groupedFields && nuf == 
this.uniqueFields) {
                        return this;
                } else {
@@ -187,7 +222,6 @@ public class LocalProperties implements Cloneable {
                        return lp;
                }
        }
-
        // 
--------------------------------------------------------------------------------------------
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
index 4e9d60a..1320f82 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
@@ -21,9 +21,10 @@ package org.apache.flink.compiler.dataproperties;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
-import org.apache.flink.compiler.dag.OptimizerNode;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.util.Utils;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -188,30 +189,63 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                this.customPartitioner = null;
        }
 
+       public void setPartitioningFields(FieldSet partitioned) {
+               this.partitioningFields = partitioned;
+       }
+
+       public void setPartitioningFields(FieldSet fields, PartitioningProperty 
partitioning) {
+               this.partitioningFields = fields;
+               this.partitioning = partitioning;
+       }
+
+       public void setOrdering(Ordering newOrdering) {
+               this.ordering = newOrdering;
+       }
+
        /**
         * Filters these properties by what can be preserved by the given node 
when propagated down
         * to the given input.
-        * 
-        * @param node The node representing the contract.
+        *
+        * @param props The node representing the contract.
         * @param input The index of the input.
-        * @return True, if any non-default value is preserved, false otherwise.
+        * @return The filtered RequestedGlobalProperties
         */
-       public RequestedGlobalProperties filterByNodesConstantSet(OptimizerNode 
node, int input) {
+       public RequestedGlobalProperties 
filterBySemanticProperties(SemanticProperties props, int input) {
+               FieldList sourceList;
+               RequestedGlobalProperties returnProps = null;
+
+               if (props == null) {
+                       return null;
+               }
+
                // check if partitioning survives
                if (this.ordering != null) {
-                       for (int col : this.ordering.getInvolvedIndexes()) {
-                               if (!node.isFieldConstant(input, col)) {
+                       Ordering no = new Ordering();
+                       returnProps = new RequestedGlobalProperties();
+                       returnProps.setPartitioningFields(new FieldSet(), 
this.partitioning);
+
+                       for (int index = 0; index < 
this.ordering.getInvolvedIndexes().size(); index++) {
+                               int value = 
this.ordering.getInvolvedIndexes().get(index);
+                               sourceList = props.getSourceField(input, value) 
== null ? null : props.getSourceField(input, value).toFieldList();
+                               if (sourceList != null) {
+                                       no.appendOrdering(sourceList.get(0), 
this.ordering.getType(index), this.ordering.getOrder(index));
+                               } else {
                                        return null;
                                }
                        }
+                       returnProps.setOrdering(no);
                } else if (this.partitioningFields != null) {
-                       for (int colIndex : this.partitioningFields) {
-                               if (!node.isFieldConstant(input, colIndex)) {
+                       returnProps = new RequestedGlobalProperties();
+                       returnProps.setPartitioningFields(new FieldSet(), 
this.partitioning);
+                       for (Integer index : this.partitioningFields) {
+                               sourceList = props.getSourceField(input, index) 
== null ? null : props.getSourceField(input, index).toFieldList();
+                               if (sourceList != null) {
+                                       
returnProps.setPartitioningFields(returnProps.getPartitionedFields().addFields(sourceList),
 this.partitioning);
+                               } else {
                                        return null;
                                }
                        }
                }
-               
                // make sure that certain properties are not pushed down
                final PartitioningProperty partitioning = this.partitioning;
                if (partitioning == PartitioningProperty.FULL_REPLICATION ||
@@ -220,8 +254,8 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                {
                        return null;
                }
-               
-               return this;
+
+               return returnProps;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
index dc897bd..a5bf118 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
@@ -22,9 +22,9 @@ package org.apache.flink.compiler.dataproperties;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.compiler.dag.OptimizerNode;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.util.Utils;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -138,29 +138,47 @@ public class RequestedLocalProperties implements 
Cloneable {
         * Filters these properties by what can be preserved through a user 
function's constant fields set.
         * Since interesting properties are filtered top-down, anything that 
partially destroys the ordering
         * makes the properties uninteresting.
-        * 
-        * @param node The optimizer node that potentially modifies the 
properties.
+        *
+        * @param props The optimizer node that potentially modifies the 
properties.
         * @param input The input of the node which is relevant.
-        * 
-        * @return True, if the resulting properties are non trivial.
+        *
+        * @return The filtered RequestedLocalProperties
         */
-       public RequestedLocalProperties filterByNodesConstantSet(OptimizerNode 
node, int input) {
+       public RequestedLocalProperties 
filterBySemanticProperties(SemanticProperties props, int input) {
+               FieldList sourceList;
+               RequestedLocalProperties returnProps = this;
+
+               if (props == null) {
+                       return null;
+               }
+
                if (this.ordering != null) {
-                       final FieldList involvedIndexes = 
this.ordering.getInvolvedIndexes();
-                       for (int i = 0; i < involvedIndexes.size(); i++) {
-                               if (!node.isFieldConstant(input, 
involvedIndexes.get(i))) {
+                       Ordering no = new Ordering();
+                       returnProps = this.clone();
+                       for (int index = 0; index < 
this.ordering.getInvolvedIndexes().size(); index++) {
+                               int value = 
this.ordering.getInvolvedIndexes().get(index);
+                               sourceList = props.getSourceField(input, value) 
== null ? null : props.getSourceField(input, value).toFieldList();
+                               if (sourceList != null) {
+                                       no.appendOrdering(sourceList.get(0), 
this.ordering.getType(index), this.ordering.getOrder(index));
+                               } else {
                                        return null;
                                }
                        }
+                       returnProps.setOrdering(no);
                } else if (this.groupedFields != null) {
+                       returnProps =  this.clone();
+                       returnProps.setGroupedFields(new FieldList());
                        // check, whether the local key grouping is preserved
                        for (Integer index : this.groupedFields) {
-                               if (!node.isFieldConstant(input, index)) {
+                               sourceList = props.getSourceField(input, index) 
== null ? null : props.getSourceField(input, index).toFieldList();
+                               if (sourceList != null) {
+                                       
returnProps.setGroupedFields(returnProps.getGroupedFields().addFields(sourceList));
+                               } else {
                                        return null;
                                }
                        }
                }
-               return this;
+               return returnProps;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
index 717a0c2..ab083ef 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler.postpass;
 
 import java.util.Map;
 
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
@@ -522,7 +523,9 @@ public abstract class GenericFlatTypePostPass<X, T extends 
AbstractSchema<X>> im
                try {
                        for (Map.Entry<Integer, X> entry : sourceSchema) {
                                Integer pos = entry.getKey();
-                               if (optNode.isFieldConstant(input, pos)) {
+                               SemanticProperties sprops = 
optNode.getSemanticProperties();
+
+                               if (sprops != null && 
sprops.getForwardFields(input, pos) != null && 
sprops.getForwardFields(input,pos).contains(pos)) {
                                        targetSchema.addType(pos, 
entry.getValue());
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java 
b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
index 8b8828e..93a1fc5 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
@@ -257,8 +257,11 @@ public class DOPChangeTest extends CompilerTestBase {
                // mapper respectively reducer
                SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
                SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
-               
-               Assert.assertEquals("The Reduce 2 Node has an invalid local 
strategy.", LocalStrategy.SORT, red2Node.getInput().getLocalStrategy());
+
+               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
+
+               Assert.assertEquals("The Reduce 2 Node has an invalid local 
strategy.", LocalStrategy.NONE, red2Node.getInput().getLocalStrategy());
+               Assert.assertEquals("The Map 2 Node has an invalid local 
strategy.", LocalStrategy.SORT, map2Node.getInput().getLocalStrategy());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java
new file mode 100644
index 0000000..49e5d85
--- /dev/null
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/SemanticPropOptimizerTest.java
@@ -0,0 +1,941 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.PlanNode;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class SemanticPropOptimizerTest extends CompilerTestBase {
+
+       private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, 
Integer, Integer, Integer, Integer>> tupleInfo =
+                       new TupleTypeInfo<Tuple8<Integer, Integer, Integer, 
Integer, Integer, Integer, Integer, Integer>>(
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO,
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+                       );
+
+       public static class SimpleReducer implements 
ReduceFunction<Tuple3<Integer, Integer, Integer>> {
+               @Override
+               public Tuple3<Integer, Integer, Integer> reduce(Tuple3<Integer, 
Integer, Integer> value1, Tuple3<Integer, Integer, Integer> value2) throws 
Exception {
+                       return null;
+               }
+       }
+
+       public static class SimpleMap implements MapFunction<Tuple3<Integer, 
Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+               @Override
+               public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, 
Integer, Integer> value) throws Exception {
+                       return null;
+               }
+       }
+
+       @Test
+       public void forwardFieldsTestMapReduce() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               set = set.map(new SimpleMap()).withConstantSet("*")
+                               .groupBy(0)
+                               .reduce(new 
SimpleReducer()).withConstantSet("0->1")
+                               .map(new SimpleMap()).withConstantSet("*")
+                               .groupBy(1)
+                               .reduce(new 
SimpleReducer()).withConstantSet("*");
+
+               set.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               oPlan.accept(new Visitor<PlanNode>() {
+                       @Override
+                       public boolean preVisit(PlanNode visitable) {
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getPactContract() instanceof ReduceOperatorBase) {
+                                       for (Channel input: 
visitable.getInputs()) {
+                                               GlobalProperties gprops = 
visitable.getGlobalProperties();
+                                               LocalProperties lprops = 
visitable.getLocalProperties();
+
+                                               Assert.assertTrue("Reduce 
should just forward the input if it is already partitioned",
+                                                               
input.getShipStrategy() == ShipStrategyType.FORWARD);
+                                               Assert.assertTrue("Wrong 
GlobalProperties on Reducer",
+                                                               
gprops.isPartitionedOnFields(new FieldSet(1)));
+                                               Assert.assertTrue("Wrong 
GlobalProperties on Reducer",
+                                                               
gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED);
+                                               Assert.assertTrue("Wrong 
LocalProperties on Reducer",
+                                                               
lprops.getGroupedFields().contains(1));
+                                       }
+                               }
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getPactContract() instanceof MapOperatorBase) {
+                                       for (Channel input: 
visitable.getInputs()) {
+                                               GlobalProperties gprops = 
visitable.getGlobalProperties();
+                                               LocalProperties lprops = 
visitable.getLocalProperties();
+
+                                               Assert.assertTrue("Map should 
just forward the input if it is already partitioned",
+                                                               
input.getShipStrategy() == ShipStrategyType.FORWARD);
+                                               Assert.assertTrue("Wrong 
GlobalProperties on Mapper",
+                                                               
gprops.isPartitionedOnFields(new FieldSet(1)));
+                                               Assert.assertTrue("Wrong 
GlobalProperties on Mapper",
+                                                               
gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED);
+                                               Assert.assertTrue("Wrong 
LocalProperties on Mapper",
+                                                               
lprops.getGroupedFields().contains(1));
+                                       }
+                                       return false;
+                               }
+                               return true;
+                       }
+
+                       @Override
+                       public void postVisit(PlanNode visitable) {
+
+                       }
+               });
+       }
+
+       @Test
+       public void localPropertiesFilterNothingPreservedTest() {
+               FieldList grouping = new FieldList().addFields(0, 3, 5, 7);
+
+               String[] constantSet = {"0->2", "3->1", "5->5"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               LocalProperties lprops = LocalProperties.forGrouping(grouping);
+
+               LocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result.getOrdering() == null);
+               Assert.assertTrue(result.getGroupedFields() == null);
+               Assert.assertTrue(result.getUniqueFields() == null);
+       }
+
+       @Test
+       public void localPropertiesFilterWrongInputTest() {
+               FieldList grouping = new FieldList().addFields(0, 3, 5, 7);
+
+               String[] constantSet = {"0->2", "3->1", "5->5"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               LocalProperties lprops = LocalProperties.forGrouping(grouping);
+
+               try {
+                       LocalProperties result = 
lprops.filterBySemanticProperties(sprops, 1);
+               } catch (Exception e) {
+                       return;
+               }
+               Assert.fail();
+       }
+
+       @Test
+       public void localPropertiesFilterGroupingPreservedTest() {
+               FieldList grouping = new FieldList().addFields(0, 3, 5, 7);
+
+               String[] constantSet = {"0->2", "3->1", "5->5", "7->0,4"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               LocalProperties lprops = LocalProperties.forGrouping(grouping);
+
+               LocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result.getOrdering() == null);
+
+               Assert.assertTrue(result.getGroupedFields().size() == 5);
+               Assert.assertTrue(result.getGroupedFields().contains(2));
+               Assert.assertTrue(result.getGroupedFields().contains(1));
+               Assert.assertTrue(result.getGroupedFields().contains(5));
+               Assert.assertTrue(result.getGroupedFields().contains(0));
+               Assert.assertTrue(result.getGroupedFields().contains(4));
+
+               Assert.assertTrue(result.getUniqueFields() == null);
+       }
+
+       @Test
+       public void localPropertiesFilterGroupingPreservedTest2() {
+               FieldList grouping = new FieldList().addFields(0, 3, 5, 7);
+
+               String[] constantSet = {"0->0", "3->3", "5->5", "7->7"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               LocalProperties lprops = LocalProperties.forGrouping(grouping);
+
+               LocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result.getOrdering() == null);
+
+               Assert.assertTrue(result.getGroupedFields().size() == 4);
+               Assert.assertTrue(result.getGroupedFields().contains(0));
+               Assert.assertTrue(result.getGroupedFields().contains(3));
+               Assert.assertTrue(result.getGroupedFields().contains(5));
+               Assert.assertTrue(result.getGroupedFields().contains(7));
+
+               Assert.assertTrue(result.getUniqueFields() == null);
+       }
+
+       @Test
+       public void localPropertiesFilterGroupingNothingPreservedTest() {
+               FieldList grouping = new FieldList().addFields(0, 3, 5, 7);
+
+               String[] constantSet = {"0->2", "3->1", "7->0,4"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               LocalProperties lprops = LocalProperties.forGrouping(grouping);
+
+               LocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result.getGroupedFields() == null);
+       }
+
+       @Test
+       public void localPropertiesFilterOrderingPreservedTest() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+               ordering.appendOrdering(1, null, Order.ASCENDING);
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+
+               String[] constantSet = {"0->2", "1->3", "2->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               LocalProperties lprops = LocalProperties.forOrdering(ordering);
+
+               LocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+               FieldList involved = result.getOrdering().getInvolvedIndexes();
+               Order[] orders = result.getOrdering().getFieldOrders();
+
+
+               Assert.assertTrue(involved.size() == 3);
+               Assert.assertTrue(involved.contains(2));
+               Assert.assertTrue(involved.contains(3));
+               Assert.assertTrue(involved.contains(0));
+               Assert.assertTrue(orders[0] == Order.ASCENDING);
+               Assert.assertTrue(orders[1] == Order.ASCENDING);
+               Assert.assertTrue(orders[2] == Order.ASCENDING);
+       }
+
+       @Test
+       public void localPropertiesFilterOrderingNothingPreservedTest() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+               ordering.appendOrdering(1, null, Order.ASCENDING);
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+
+               String[] constantSet = {"0->2", "2->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               LocalProperties lprops = LocalProperties.forOrdering(ordering);
+
+               LocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result.getOrdering() == null);
+       }
+
+
+
+       @Test
+       public void localPropertiesFilterUniqueFieldsPreservedTest() {
+               String[] constantSet = {"0->1", "1->2", "3->4", "2->3", "6->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+
+               FieldSet set1 = new FieldSet(0, 1, 2, 3);
+               FieldSet set2 = new FieldSet(4, 5, 6, 7);
+               FieldSet set3 = new FieldSet(0, 1, 6, 2);
+               LocalProperties lprops = new LocalProperties();
+               lprops = 
lprops.addUniqueFields(set1).addUniqueFields(set2).addUniqueFields(set3);
+
+
+               LocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+               Set<FieldSet> unique = result.getUniqueFields();
+               FieldSet expected1 = new FieldSet(1, 2, 3, 4);
+               FieldSet expected2 = new FieldSet(0, 1, 2, 3);
+
+
+               Assert.assertTrue(unique.size() == 2);
+               Assert.assertTrue(unique.contains(expected1));
+               Assert.assertTrue(unique.contains(expected2));
+       }
+
+       @Test
+       public void requestedLocalPropertiesFilterNothingPreservedTest() {
+               FieldList grouping = new FieldList().addFields(0, 3, 5, 7);
+
+               String[] constantSet = {"0->2", "3->1", "5->5"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedLocalProperties lprops = new 
RequestedLocalProperties();
+               lprops.setGroupedFields(grouping);
+
+               RequestedLocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result == null);
+       }
+
+       @Test
+       public void requestedLocalPropertiesFilterWrongInputTest() {
+               FieldList grouping = new FieldList().addFields(0, 3, 5, 7);
+
+               String[] constantSet = {"0->2", "3->1", "5->5"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedLocalProperties lprops = new 
RequestedLocalProperties();
+               lprops.setGroupedFields(grouping);
+
+               try {
+                       RequestedLocalProperties result = 
lprops.filterBySemanticProperties(sprops, 1);
+               } catch (Exception e) {
+                       return;
+               }
+               Assert.fail();
+       }
+
+       @Test
+       public void requestedLocalPropertiesFilterGroupingPreservedTest() {
+               FieldList grouping = new FieldList().addFields(0, 3, 5, 7);
+
+               String[] constantSet = {"6->7", "1->5", "4->3", "7->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedLocalProperties lprops = new 
RequestedLocalProperties();
+               lprops.setGroupedFields(grouping);
+
+               RequestedLocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+
+               Assert.assertTrue(result.getGroupedFields().size() == 4);
+               Assert.assertTrue(result.getGroupedFields().contains(7));
+               Assert.assertTrue(result.getGroupedFields().contains(4));
+               Assert.assertTrue(result.getGroupedFields().contains(1));
+               Assert.assertTrue(result.getGroupedFields().contains(6));
+       }
+
+       @Test
+       public void requestedLocalPropertiesFilterGroupingPreservedTest2() {
+               FieldList grouping = new FieldList().addFields(0, 3, 5, 7);
+
+               String[] constantSet = {"0->0", "3->3", "5->5", "7->7"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedLocalProperties lprops = new 
RequestedLocalProperties();
+               lprops.setGroupedFields(grouping);
+
+               RequestedLocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+
+               Assert.assertTrue(result.getGroupedFields().size() == 4);
+               Assert.assertTrue(result.getGroupedFields().contains(0));
+               Assert.assertTrue(result.getGroupedFields().contains(3));
+               Assert.assertTrue(result.getGroupedFields().contains(5));
+               Assert.assertTrue(result.getGroupedFields().contains(7));
+       }
+
+       @Test
+       public void requestedLocalPropertiesFilterOrderingPreservedTest() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+               ordering.appendOrdering(3, null, Order.ASCENDING);
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+
+               String[] constantSet = {"6->2", "2->3", "5->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedLocalProperties lprops = new 
RequestedLocalProperties();
+               lprops.setOrdering(ordering);
+
+               RequestedLocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+               FieldList involved = result.getOrdering().getInvolvedIndexes();
+               Order[] orders = result.getOrdering().getFieldOrders();
+
+               Assert.assertTrue(involved.size() == 3);
+               Assert.assertTrue(involved.contains(6));
+               Assert.assertTrue(involved.contains(2));
+               Assert.assertTrue(involved.contains(5));
+               Assert.assertTrue(orders[0] == Order.ASCENDING);
+               Assert.assertTrue(orders[1] == Order.ASCENDING);
+               Assert.assertTrue(orders[2] == Order.ASCENDING);
+       }
+
+       @Test
+       public void requestedLocalPropertiesFilterOrderingPreservedTest2() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+               ordering.appendOrdering(3, null, Order.ASCENDING);
+               ordering.appendOrdering(6, null, Order.ASCENDING);
+
+               String[] constantSet = {"0->0", "3->3", "6->6"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedLocalProperties lprops = new 
RequestedLocalProperties();
+               lprops.setOrdering(ordering);
+
+               RequestedLocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+               FieldList involved = result.getOrdering().getInvolvedIndexes();
+               Order[] orders = result.getOrdering().getFieldOrders();
+
+               Assert.assertTrue(involved.size() == 3);
+               Assert.assertTrue(involved.contains(0));
+               Assert.assertTrue(involved.contains(3));
+               Assert.assertTrue(involved.contains(6));
+               Assert.assertTrue(orders[0] == Order.ASCENDING);
+               Assert.assertTrue(orders[1] == Order.ASCENDING);
+               Assert.assertTrue(orders[2] == Order.ASCENDING);
+       }
+
+       @Test
+       public void 
requestedLocalPropertiesFilterOrderingNothingPreservedTest() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+               ordering.appendOrdering(3, null, Order.ASCENDING);
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+
+               String[] constantSet = {"6->2", "5->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedLocalProperties lprops = new 
RequestedLocalProperties();
+               lprops.setOrdering(ordering);
+
+               RequestedLocalProperties result = 
lprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result == null);
+       }
+
+       @Test
+       public void requestedLocalPropertiesDualFilterOrderingPreservedTest() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+               ordering.appendOrdering(3, null, Order.ASCENDING);
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+
+               String[] constantSet = {"6->2", "2->3", "5->0"};
+               DualInputSemanticProperties dprops = new 
DualInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsDualFromString(dprops, 
constantSet, constantSet, null, null, null, null, tupleInfo, tupleInfo, 
tupleInfo);
+
+               RequestedLocalProperties lprops1 = new 
RequestedLocalProperties();
+               lprops1.setOrdering(ordering);
+
+               RequestedLocalProperties lprops2 = new 
RequestedLocalProperties();
+               lprops2.setOrdering(ordering);
+
+               RequestedLocalProperties result1 = 
lprops1.filterBySemanticProperties(dprops, 0);
+               RequestedLocalProperties result2 = 
lprops2.filterBySemanticProperties(dprops, 1);
+
+               FieldSet involved1 = result1.getOrdering().getInvolvedIndexes();
+               FieldSet involved2 = result2.getOrdering().getInvolvedIndexes();
+               Order[] orders1 = result1.getOrdering().getFieldOrders();
+               Order[] orders2 = result2.getOrdering().getFieldOrders();
+
+
+               Assert.assertTrue(involved1.size() == 3);
+               Assert.assertTrue(involved1.contains(6));
+               Assert.assertTrue(involved1.contains(2));
+               Assert.assertTrue(involved1.contains(5));
+               Assert.assertTrue(orders1[0] == Order.ASCENDING);
+               Assert.assertTrue(orders1[1] == Order.ASCENDING);
+               Assert.assertTrue(orders1[2] == Order.ASCENDING);
+
+               Assert.assertTrue(involved2.size() == 3);
+               Assert.assertTrue(involved2.contains(6));
+               Assert.assertTrue(involved2.contains(2));
+               Assert.assertTrue(involved2.contains(5));
+               Assert.assertTrue(orders2[0] == Order.ASCENDING);
+               Assert.assertTrue(orders2[1] == Order.ASCENDING);
+               Assert.assertTrue(orders2[2] == Order.ASCENDING);
+       }
+
+       @Test
+       public void globalPropertiesFilterNothingPreservedTest() {
+               String[] constantSet = {"0->2", "3->1", "5->5"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result.getOrdering() == null);
+               Assert.assertTrue(result.getPartitioningFields() == null);
+       }
+
+       @Test
+       public void globalPropertiesFilterWrongInputTest() {
+               String[] constantSet = {"0->2", "3->1", "5->5"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1, 3, 4, 6));
+
+               try {
+                       GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 1);
+               } catch (Exception e) {
+                       return;
+               }
+               Assert.fail();
+       }
+
+       @Test
+       public void globalPropertiesFilterPartitioningPreservedTest() {
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+               String[] constantSet = {"0->2", "1->4", "4->7"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               FieldList list = result.getPartitioningFields();
+               Assert.assertTrue(list.size() == 3);
+               Assert.assertTrue(list.contains(2));
+               Assert.assertTrue(list.contains(4));
+               Assert.assertTrue(list.contains(7));
+               Assert.assertTrue(result.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED);
+       }
+
+       @Test
+       public void globalPropertiesFilterPartitioningPreservedTest2() {
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+               String[] constantSet = {"0->2, 6", "1->4, 5", "4->7"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               FieldList list = result.getPartitioningFields();
+               Assert.assertTrue(list.size() == 5);
+               Assert.assertTrue(list.contains(2));
+               Assert.assertTrue(list.contains(4));
+               Assert.assertTrue(list.contains(6));
+               Assert.assertTrue(list.contains(5));
+               Assert.assertTrue(list.contains(7));
+               Assert.assertTrue(result.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED);
+       }
+
+       @Test
+       public void globalPropertiesFilterPartitioningPreservedTest3() {
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+               String[] constantSet = {"0->0", "1->1", "4->4"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               FieldList list = result.getPartitioningFields();
+               Assert.assertTrue(list.size() == 3);
+               Assert.assertTrue(list.contains(0));
+               Assert.assertTrue(list.contains(1));
+               Assert.assertTrue(list.contains(4));
+               Assert.assertTrue(result.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED);
+       }
+
+       @Test
+       public void globalPropertiesFilterOrderingPreservedTest() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+               ordering.appendOrdering(1, null, Order.ASCENDING);
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+
+               String[] constantSet = {"0->2", "1->3", "2->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setOrdering(ordering);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               FieldList involved = result.getOrdering().getInvolvedIndexes();
+               Order[] orders = result.getOrdering().getFieldOrders();
+
+
+               Assert.assertTrue(involved.size() == 3);
+               Assert.assertTrue(involved.contains(2));
+               Assert.assertTrue(involved.contains(3));
+               Assert.assertTrue(involved.contains(0));
+               Assert.assertTrue(orders[0] == Order.ASCENDING);
+               Assert.assertTrue(orders[1] == Order.ASCENDING);
+               Assert.assertTrue(orders[2] == Order.ASCENDING);
+       }
+
+       @Test
+       public void globalPropertiesFilterOrderingPreservedTest2() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+               ordering.appendOrdering(1, null, Order.ASCENDING);
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+
+               String[] constantSet = {"0->2,4", "1->3", "2->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setOrdering(ordering);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               FieldList involved = result.getOrdering().getInvolvedIndexes();
+               Order[] orders = result.getOrdering().getFieldOrders();
+
+
+               Assert.assertTrue(involved.size() == 4);
+               Assert.assertTrue(involved.contains(2));
+               Assert.assertTrue(involved.contains(3));
+               Assert.assertTrue(involved.contains(4));
+               Assert.assertTrue(involved.contains(0));
+               Assert.assertTrue(orders[0] == Order.ASCENDING);
+               Assert.assertTrue(orders[1] == Order.ASCENDING);
+               Assert.assertTrue(orders[2] == Order.ASCENDING);
+               Assert.assertTrue(orders[3] == Order.ASCENDING);
+       }
+
+       @Test
+       public void globalPropertiesFilterOrderingPreservedTest3() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+               ordering.appendOrdering(1, null, Order.ASCENDING);
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+
+               String[] constantSet = {"0->0", "1->1", "2->2"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setOrdering(ordering);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               FieldList involved = result.getOrdering().getInvolvedIndexes();
+               Order[] orders = result.getOrdering().getFieldOrders();
+
+
+               Assert.assertTrue(involved.size() == 3);
+               Assert.assertTrue(involved.contains(0));
+               Assert.assertTrue(involved.contains(1));
+               Assert.assertTrue(involved.contains(2));
+               Assert.assertTrue(orders[0] == Order.ASCENDING);
+               Assert.assertTrue(orders[1] == Order.ASCENDING);
+               Assert.assertTrue(orders[2] == Order.ASCENDING);
+       }
+
+       @Test
+       public void globalPropertiesFilterOrderingNothingPreservedTest() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+               ordering.appendOrdering(1, null, Order.ASCENDING);
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+
+               String[] constantSet = {"0->2", "2->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setOrdering(ordering);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result.getOrdering() == null);
+       }
+
+       @Test
+       public void globalPropertiesFilterUniqueFieldsPreservedTest() {
+               String[] constantSet = {"0->1", "1->2", "3->4", "2->3", "6->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+
+               FieldSet set1 = new FieldSet(0, 1, 2, 3);
+               FieldSet set2 = new FieldSet(4, 5, 6, 7);
+               FieldSet set3 = new FieldSet(0, 1, 6, 2);
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.addUniqueFieldCombination(set1);
+               gprops.addUniqueFieldCombination(set2);
+               gprops.addUniqueFieldCombination(set3);
+
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+               Set<FieldSet> unique = result.getUniqueFieldCombination();
+               FieldSet expected1 = new FieldSet(1, 2, 3, 4);
+               FieldSet expected2 = new FieldSet(0, 1, 2, 3);
+
+
+               Assert.assertTrue(unique.size() == 2);
+               Assert.assertTrue(unique.contains(expected1));
+               Assert.assertTrue(unique.contains(expected2));
+       }
+
+       @Test
+       public void requestedGlobalPropertiesFilterNothingPreservedTest() {
+               String[] constantSet = {"0->2", "3->1", "5->5"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties gprops = new 
RequestedGlobalProperties();
+               RequestedGlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result == null);
+       }
+
+       @Test
+       public void 
requestedGlobalPropertiesDualFilterEverythingPreservedTest() {
+               String[] constantSet1 = {"0->1,2", "3->0", "2->4"};
+               String[] constantSet2 = {"0->3", "4->6", "6->7"};
+
+
+        DualInputSemanticProperties dprops = new DualInputSemanticProperties();
+        SemanticPropUtil.getSemanticPropsDualFromString(dprops, constantSet1, 
constantSet2, null, null, null, null, tupleInfo, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties gprops1 = new 
RequestedGlobalProperties();
+               RequestedGlobalProperties gprops2 = new 
RequestedGlobalProperties();
+
+        gprops1.setHashPartitioned(new FieldSet(2, 0, 4));
+        gprops2.setHashPartitioned(new FieldSet(3, 6, 7));
+
+        gprops1 = gprops1.filterBySemanticProperties(dprops, 0);
+        gprops2 = gprops2.filterBySemanticProperties(dprops, 1);
+
+        Assert.assertTrue(gprops1.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED);
+        Assert.assertTrue(gprops1.getPartitionedFields().size() == 3);
+        Assert.assertTrue(gprops1.getPartitionedFields().contains(0));
+        Assert.assertTrue(gprops1.getPartitionedFields().contains(3));
+        Assert.assertTrue(gprops1.getPartitionedFields().contains(2));
+
+        Assert.assertTrue(gprops2.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED);
+        Assert.assertTrue(gprops2.getPartitionedFields().size() == 3);
+        Assert.assertTrue(gprops2.getPartitionedFields().contains(0));
+        Assert.assertTrue(gprops2.getPartitionedFields().contains(4));
+        Assert.assertTrue(gprops2.getPartitionedFields().contains(6));
+       }
+
+       @Test
+       public void requestedGlobalPropertiesDualFilterNothingPreservedTest() {
+               String[] constantSet1 = {"0->1,2", "3->0", "2->4"};
+               String[] constantSet2 = {"0->3", "4->6", "6->7"};
+
+
+        DualInputSemanticProperties dprops = new DualInputSemanticProperties();
+        SemanticPropUtil.getSemanticPropsDualFromString(dprops, constantSet1, 
constantSet2, null, null, null, null, tupleInfo, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties gprops1 = new 
RequestedGlobalProperties();
+               RequestedGlobalProperties gprops2 = new 
RequestedGlobalProperties();
+
+        gprops1.setHashPartitioned(new FieldSet(6, 7));
+        gprops2.setHashPartitioned(new FieldSet(3, 6, 7));
+
+        gprops1 = gprops1.filterBySemanticProperties(dprops, 0);
+        gprops2 = gprops2.filterBySemanticProperties(dprops, 1);
+
+        Assert.assertTrue(gprops1 == null);
+
+        Assert.assertTrue(gprops2.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED);
+        Assert.assertTrue(gprops2.getPartitionedFields().size() == 3);
+        Assert.assertTrue(gprops2.getPartitionedFields().contains(0));
+        Assert.assertTrue(gprops2.getPartitionedFields().contains(4));
+        Assert.assertTrue(gprops2.getPartitionedFields().contains(6));
+       }
+
+       @Test
+       public void requestedGlobalPropertiesFilterWrongInputTest() {
+               String[] constantSet = {"0->2", "3->1", "5->5"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties gprops = new 
RequestedGlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1, 3, 4, 6));
+
+               try {
+                       RequestedGlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 1);
+               } catch (Exception e) {
+                       return;
+               }
+               Assert.fail();
+       }
+
+       @Test
+       public void requestedGlobalPropertiesFilterPartitioningPreservedTest() {
+               RequestedGlobalProperties gprops = new 
RequestedGlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+               String[] constantSet = {"3->0", "5->1", "2->4"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+
+               RequestedGlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               FieldSet set = result.getPartitionedFields();
+               Assert.assertTrue(set.size() == 3);
+               Assert.assertTrue(set.contains(3));
+               Assert.assertTrue(set.contains(5));
+               Assert.assertTrue(set.contains(2));
+               Assert.assertTrue(result.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED);
+       }
+
+       @Test
+       public void requestedGlobalPropertiesFilterPartitioningPreservedTest2() 
{
+               RequestedGlobalProperties gprops = new 
RequestedGlobalProperties();
+               gprops.setHashPartitioned(new FieldList(3, 5, 2));
+
+               String[] constantSet = {"3->3", "5->5", "2->2"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+
+               RequestedGlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               FieldSet set = result.getPartitionedFields();
+               Assert.assertTrue(set.size() == 3);
+               Assert.assertTrue(set.contains(3));
+               Assert.assertTrue(set.contains(5));
+               Assert.assertTrue(set.contains(2));
+               Assert.assertTrue(result.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED);
+       }
+
+       @Test
+       public void semanticPropertySourceFieldTest() {
+               SingleInputSemanticProperties props = new 
SingleInputSemanticProperties();
+               props.addForwardedField(2, 4);
+               props.addForwardedField(3, 8);
+               props.addForwardedField(4, 10);
+               props.addForwardedField(4, 11);
+
+               Assert.assertTrue(props.getSourceField(0, 4).size() == 1);
+               Assert.assertTrue(props.getSourceField(0, 4).contains(2));
+               Assert.assertTrue(props.getSourceField(0, 8).size() == 1);
+               Assert.assertTrue(props.getSourceField(0, 8).contains(3));
+               Assert.assertTrue(props.getSourceField(0, 10).size() == 1);
+               Assert.assertTrue(props.getSourceField(0, 10).contains(4));
+               Assert.assertTrue(props.getSourceField(0, 11).size() == 1);
+               Assert.assertTrue(props.getSourceField(0, 11).contains(4));
+       }
+
+       @Test
+       public void requestedGlobalPropertiesFilterOrderingPreservedTest() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+               ordering.appendOrdering(3, null, Order.ASCENDING);
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+
+               String[] constantSet = {"6->2", "2->3", "5->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties gprops = new 
RequestedGlobalProperties();
+               gprops.setOrdering(ordering);
+
+               RequestedGlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               FieldList involved = result.getOrdering().getInvolvedIndexes();
+               Order[] orders = result.getOrdering().getFieldOrders();
+
+
+               Assert.assertTrue(involved.size() == 3);
+               Assert.assertTrue(involved.contains(6));
+               Assert.assertTrue(involved.contains(2));
+               Assert.assertTrue(involved.contains(5));
+               Assert.assertTrue(orders[0] == Order.ASCENDING);
+               Assert.assertTrue(orders[1] == Order.ASCENDING);
+               Assert.assertTrue(orders[2] == Order.ASCENDING);
+       }
+
+       @Test
+       public void 
requestedGlobalPropertiesFilterOrderingNothingPreservedTest() {
+               Ordering ordering = new Ordering();
+               ordering.appendOrdering(2, null, Order.ASCENDING);
+               ordering.appendOrdering(3, null, Order.ASCENDING);
+               ordering.appendOrdering(0, null, Order.ASCENDING);
+
+               String[] constantSet = {"6->2", "5->0"};
+               SingleInputSemanticProperties sprops = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties gprops = new 
RequestedGlobalProperties();
+               gprops.setOrdering(ordering);
+
+               RequestedGlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               Assert.assertTrue(result == null);
+       }
+
+       @Test
+       public void forwardFieldsTestJoin() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> in1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> in2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               in1 = in1.map(new SimpleMap()).withConstantSet("*")
+                               .groupBy(0)
+                               .reduce(new 
SimpleReducer()).withConstantSet("0->1");
+               in2 = in2.map(new SimpleMap()).withConstantSet("*")
+                               .groupBy(1)
+                               .reduce(new 
SimpleReducer()).withConstantSet("1->2");
+               DataSet<Tuple3<Integer, Integer, Integer>> out = 
in1.join(in2).where(1).equalTo(2).with(new JoinFunction<Tuple3<Integer, 
Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, 
Integer>>() {
+                       @Override
+                       public Tuple3<Integer, Integer, Integer> 
join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> 
second) throws Exception {
+                               return null;
+                       }
+               });
+
+               out.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               oPlan.accept(new Visitor<PlanNode>() {
+                       @Override
+                       public boolean preVisit(PlanNode visitable) {
+                               if (visitable instanceof DualInputPlanNode && 
visitable.getPactContract() instanceof JoinOperatorBase) {
+                                       DualInputPlanNode node = 
((DualInputPlanNode) visitable);
+
+                                       final Channel inConn1 = 
node.getInput1();
+                                       final Channel inConn2 = 
node.getInput2();
+
+                                       Assert.assertTrue("Join should just 
forward the input if it is already partitioned",
+                                                       
inConn1.getShipStrategy() == ShipStrategyType.FORWARD);
+                                       Assert.assertTrue("Join should just 
forward the input if it is already partitioned",
+                                                       
inConn2.getShipStrategy() == ShipStrategyType.FORWARD);
+                                       return false;
+                               }
+                               return true;
+                       }
+
+                       @Override
+                       public void postVisit(PlanNode visitable) {
+
+                       }
+               });
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
index e9aa358..fe53380 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
@@ -58,7 +58,40 @@ public class DualInputSemanticProperties extends 
SemanticProperties {
        public DualInputSemanticProperties() {
                init();
        }
-       
+
+       /**
+        * Finds the source field where the given field was forwarded from.
+        * @param dest The destination field in the output data.
+        * @return FieldSet containing the source input fields.
+        */
+       public FieldSet forwardedFrom1(int dest) {
+               FieldSet fs = null;
+               for (Map.Entry<Integer, FieldSet> entry : 
forwardedFields1.entrySet()) {
+                       if (entry.getValue().contains(dest)) {
+                               if (fs == null) {
+                                       fs = new FieldSet();
+                               }
+
+                               fs = fs.addField(entry.getKey());
+                       }
+               }
+               return fs;
+       }
+
+       public FieldSet forwardedFrom2(int dest) {
+               FieldSet fs = null;
+               for (Map.Entry<Integer, FieldSet> entry : 
forwardedFields2.entrySet()) {
+                       if (entry.getValue().contains(dest)) {
+                               if (fs == null) {
+                                       fs = new FieldSet();
+                               }
+
+                               fs = fs.addField(entry.getKey());
+                       }
+               }
+               return fs;
+       }
+
        /**
         * Adds, to the existing information, a field that is forwarded directly
         * from the source record(s) in the first input to the destination
@@ -115,6 +148,10 @@ public class DualInputSemanticProperties extends 
SemanticProperties {
         * @return the destination fields, or null if they do not exist
         */
        public FieldSet getForwardedField1(int sourceField) {
+               if (isAllFieldsConstant()) {
+                       return new FieldSet(sourceField);
+               }
+
                return this.forwardedFields1.get(sourceField);
        }
        
@@ -174,9 +211,43 @@ public class DualInputSemanticProperties extends 
SemanticProperties {
         * @return the destination fields, or null if they do not exist
         */
        public FieldSet getForwardedField2(int sourceField) {
+               if (isAllFieldsConstant()) {
+                       return new FieldSet(sourceField);
+               }
+
                return this.forwardedFields2.get(sourceField);
        }
-       
+
+       @Override
+       public FieldSet getSourceField(int input, int field) {
+               if (isAllFieldsConstant()) {
+                       return new FieldSet(field);
+               }
+
+               switch(input) {
+                       case 0:
+                               return this.forwardedFrom1(field);
+                       case 1:
+                               return this.forwardedFrom2(field);
+                       default:
+                               throw new IndexOutOfBoundsException();
+               }
+       }
+
+       @Override
+       public FieldSet getForwardFields(int input, int field) {
+               if (isAllFieldsConstant()) {
+                       return new FieldSet(field);
+               }
+
+               if (input == 0) {
+                       return this.getForwardedField1(field);
+               } else if (input == 1) {
+                       return this.getForwardedField2(field);
+               }
+               return null;
+       }
+
        /**
         * Adds, to the existing information, field(s) that are read in
         * the source record(s) from the first input.
@@ -253,7 +324,7 @@ public class DualInputSemanticProperties extends 
SemanticProperties {
                super.clearProperties();
                init();
        }
-       
+
        @Override
        public boolean isEmpty() {
                return super.isEmpty() &&
@@ -263,7 +334,11 @@ public class DualInputSemanticProperties extends 
SemanticProperties {
                                (readFields2 == null || readFields2.size() == 
0);
        }
        
-       
+       @Override
+       public String toString() {
+               return "DISP(" + this.forwardedFields1 + "; " + 
this.forwardedFields2 + ")";
+       }
+
        private void init() {
                this.forwardedFields1 = new HashMap<Integer,FieldSet>();
                this.forwardedFields2 = new HashMap<Integer,FieldSet>();

Reply via email to