http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java
new file mode 100644
index 0000000..397dd3d
--- /dev/null
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class LocalPropertiesFilteringTest {
+
+       private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, 
Integer, Integer, Integer, Integer>> tupleInfo =
+                       new TupleTypeInfo<Tuple8<Integer, Integer, Integer, 
Integer, Integer, Integer, Integer, Integer>>(
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO,
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+                       );
+
+       @Test
+       public void testAllErased1() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, 
null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(0, 1, 2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+               lProps = lProps.addUniqueFields(new FieldSet(5,6));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testAllErased2() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"5"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(0, 1, 2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testGroupingPreserved1() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(0, 2, 3));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNotNull(filtered.getGroupedFields());
+               assertEquals(3, filtered.getGroupedFields().size());
+               assertTrue(filtered.getGroupedFields().contains(0));
+               assertTrue(filtered.getGroupedFields().contains(2));
+               assertTrue(filtered.getGroupedFields().contains(3));
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testGroupingPreserved2() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0->4;2->0;3->7"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(0, 2, 3));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNotNull(filtered.getGroupedFields());
+               assertEquals(3, filtered.getGroupedFields().size());
+               assertTrue(filtered.getGroupedFields().contains(4));
+               assertTrue(filtered.getGroupedFields().contains(0));
+               assertTrue(filtered.getGroupedFields().contains(7));
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testGroupingErased() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0->4;2->0"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(0, 2, 3));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testSortingPreserved1() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;2;5"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               LocalProperties lProps = LocalProperties.forOrdering(o);
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldList gFields = filtered.getGroupedFields();
+               Ordering order = filtered.getOrdering();
+
+               assertNotNull(gFields);
+               assertEquals(3, gFields.size());
+               assertTrue(gFields.contains(0));
+               assertTrue(gFields.contains(2));
+               assertTrue(gFields.contains(5));
+               assertNotNull(order);
+               assertEquals(3, order.getNumberOfFields());
+               assertEquals(2, order.getFieldNumber(0).intValue());
+               assertEquals(0, order.getFieldNumber(1).intValue());
+               assertEquals(5, order.getFieldNumber(2).intValue());
+               assertEquals(Order.ASCENDING, order.getOrder(0));
+               assertEquals(Order.DESCENDING, order.getOrder(1));
+               assertEquals(Order.DESCENDING, order.getOrder(2));
+               assertEquals(IntValue.class, order.getType(0));
+               assertEquals(StringValue.class, order.getType(1));
+               assertEquals(LongValue.class, order.getType(2));
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testSortingPreserved2() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0->3;2->7;5->1"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               LocalProperties lProps = LocalProperties.forOrdering(o);
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldList gFields = filtered.getGroupedFields();
+               Ordering order = filtered.getOrdering();
+
+               assertNotNull(gFields);
+               assertEquals(3, gFields.size());
+               assertTrue(gFields.contains(3));
+               assertTrue(gFields.contains(7));
+               assertTrue(gFields.contains(1));
+               assertNotNull(order);
+               assertEquals(3, order.getNumberOfFields());
+               assertEquals(7, order.getFieldNumber(0).intValue());
+               assertEquals(3, order.getFieldNumber(1).intValue());
+               assertEquals(1, order.getFieldNumber(2).intValue());
+               assertEquals(Order.ASCENDING, order.getOrder(0));
+               assertEquals(Order.DESCENDING, order.getOrder(1));
+               assertEquals(Order.DESCENDING, order.getOrder(2));
+               assertEquals(IntValue.class, order.getType(0));
+               assertEquals(StringValue.class, order.getType(1));
+               assertEquals(LongValue.class, order.getType(2));
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testSortingPreserved3() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               LocalProperties lProps = LocalProperties.forOrdering(o);
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldList gFields = filtered.getGroupedFields();
+               Ordering order = filtered.getOrdering();
+
+               assertNotNull(gFields);
+               assertEquals(2, gFields.size());
+               assertTrue(gFields.contains(0));
+               assertTrue(gFields.contains(2));
+               assertNotNull(order);
+               assertEquals(2, order.getNumberOfFields());
+               assertEquals(2, order.getFieldNumber(0).intValue());
+               assertEquals(0, order.getFieldNumber(1).intValue());
+               assertEquals(Order.ASCENDING, order.getOrder(0));
+               assertEquals(Order.DESCENDING, order.getOrder(1));
+               assertEquals(IntValue.class, order.getType(0));
+               assertEquals(StringValue.class, order.getType(1));
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testSortingPreserved4() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"2->7;5"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               LocalProperties lProps = LocalProperties.forOrdering(o);
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldList gFields = filtered.getGroupedFields();
+               Ordering order = filtered.getOrdering();
+
+               assertNotNull(gFields);
+               assertEquals(1, gFields.size());
+               assertTrue(gFields.contains(7));
+               assertNotNull(order);
+               assertEquals(1, order.getNumberOfFields());
+               assertEquals(7, order.getFieldNumber(0).intValue());
+               assertEquals(Order.ASCENDING, order.getOrder(0));
+               assertEquals(IntValue.class, order.getType(0));
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testSortingErased() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;5"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               LocalProperties lProps = LocalProperties.forOrdering(o);
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldList gFields = filtered.getGroupedFields();
+               Ordering order = filtered.getOrdering();
+
+               assertNull(gFields);
+               assertNull(order);
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testUniqueFieldsPreserved1() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = new LocalProperties();
+               lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+               lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldSet expected1 = new FieldSet(0,1,2);
+               FieldSet expected2 = new FieldSet(3,4);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNotNull(filtered.getUniqueFields());
+               assertEquals(2, filtered.getUniqueFields().size());
+               assertTrue(filtered.getUniqueFields().contains(expected1));
+               assertTrue(filtered.getUniqueFields().contains(expected2));
+       }
+
+       @Test
+       public void testUniqueFieldsPreserved2() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(1,2));
+               lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+               lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldSet expected1 = new FieldSet(0,1,2);
+               FieldSet expected2 = new FieldSet(3,4);
+
+               assertNull(filtered.getOrdering());
+               assertNotNull(filtered.getGroupedFields());
+               assertEquals(2, filtered.getGroupedFields().size());
+               assertTrue(filtered.getGroupedFields().contains(1));
+               assertTrue(filtered.getGroupedFields().contains(2));
+               assertNotNull(filtered.getUniqueFields());
+               assertEquals(2, filtered.getUniqueFields().size());
+               assertTrue(filtered.getUniqueFields().contains(expected1));
+               assertTrue(filtered.getUniqueFields().contains(expected2));
+       }
+
+       @Test
+       public void testUniqueFieldsPreserved3() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0->7;1->6;2->5;3->4;4->3"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = new LocalProperties();
+               lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+               lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldSet expected1 = new FieldSet(5,6,7);
+               FieldSet expected2 = new FieldSet(3,4);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNotNull(filtered.getUniqueFields());
+               assertEquals(2, filtered.getUniqueFields().size());
+               assertTrue(filtered.getUniqueFields().contains(expected1));
+               assertTrue(filtered.getUniqueFields().contains(expected2));
+       }
+
+       @Test
+       public void testUniqueFieldsErased() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = new LocalProperties();
+               lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+               lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testInvalidInputIndex() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lprops = LocalProperties.forGrouping(new 
FieldList(0,1));
+
+               lprops.filterBySemanticProperties(sprops, 1);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java
new file mode 100644
index 0000000..2f438cf
--- /dev/null
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Key;
+
+import java.io.IOException;
+
+public class MockDistribution implements DataDistribution {
+
+       @Override
+       public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+               return new Key<?>[0];
+       }
+
+       @Override
+       public int getNumberOfFields() {
+               return 0;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
index 71e4c3a..1245398 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
@@ -19,13 +19,14 @@
 package org.apache.flink.compiler.dataproperties;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
 
-class MockPartitioner implements Partitioner<Long> {
+class MockPartitioner implements Partitioner<Tuple2<Long, Integer>> {
        
        private static final long serialVersionUID = 1L;
 
        @Override
-       public int partition(Long key, int numPartitions) {
+       public int partition(Tuple2<Long, Integer> key, int numPartitions) {
                return 0;
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
new file mode 100644
index 0000000..f9acabb
--- /dev/null
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class RequestedGlobalPropertiesFilteringTest {
+
+       private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, 
Integer, Integer, Integer, Integer>> tupleInfo =
+                       new TupleTypeInfo<Tuple8<Integer, Integer, Integer, 
Integer, Integer, Integer, Integer, Integer>>(
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO,
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+                       );
+
+       @Test(expected = NullPointerException.class)
+       public void testNullProps() {
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0,1,2));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(null, 0);
+       }
+
+       @Test
+       public void testEraseAll1() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0,1,2));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testEraseAll2() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"3;4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0, 1, 2));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testHashPartitioningPreserved1() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.HASH_PARTITIONED, 
filtered.getPartitioning());
+               assertNotNull(filtered.getPartitionedFields());
+               assertEquals(3, filtered.getPartitionedFields().size());
+               assertTrue(filtered.getPartitionedFields().contains(0));
+               assertTrue(filtered.getPartitionedFields().contains(3));
+               assertTrue(filtered.getPartitionedFields().contains(4));
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testHashPartitioningPreserved2() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.HASH_PARTITIONED, 
filtered.getPartitioning());
+               assertNotNull(filtered.getPartitionedFields());
+               assertEquals(3, filtered.getPartitionedFields().size());
+               assertTrue(filtered.getPartitionedFields().contains(1));
+               assertTrue(filtered.getPartitionedFields().contains(2));
+               assertTrue(filtered.getPartitionedFields().contains(7));
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testHashPartitioningErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testAnyPartitioningPreserved1() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.ANY_PARTITIONING, 
filtered.getPartitioning());
+               assertNotNull(filtered.getPartitionedFields());
+               assertEquals(3, filtered.getPartitionedFields().size());
+               assertTrue(filtered.getPartitionedFields().contains(0));
+               assertTrue(filtered.getPartitionedFields().contains(3));
+               assertTrue(filtered.getPartitionedFields().contains(4));
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testAnyPartitioningPreserved2() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.ANY_PARTITIONING, 
filtered.getPartitioning());
+               assertNotNull(filtered.getPartitionedFields());
+               assertEquals(3, filtered.getPartitionedFields().size());
+               assertTrue(filtered.getPartitionedFields().contains(1));
+               assertTrue(filtered.getPartitionedFields().contains(2));
+               assertTrue(filtered.getPartitionedFields().contains(7));
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testAnyPartitioningErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testRangePartitioningPreserved1() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"1;3;6"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setRangePartitioned(o);
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.RANGE_PARTITIONED, 
filtered.getPartitioning());
+               assertNotNull(filtered.getOrdering());
+               assertEquals(3, filtered.getOrdering().getNumberOfFields());
+               assertEquals(3, 
filtered.getOrdering().getFieldNumber(0).intValue());
+               assertEquals(1, 
filtered.getOrdering().getFieldNumber(1).intValue());
+               assertEquals(6, 
filtered.getOrdering().getFieldNumber(2).intValue());
+               assertEquals(LongValue.class, 
filtered.getOrdering().getType(0));
+               assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+               assertEquals(ByteValue.class, 
filtered.getOrdering().getType(2));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(0));
+               assertEquals(Order.ASCENDING, 
filtered.getOrdering().getOrder(1));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(2));
+               assertNull(filtered.getPartitionedFields());
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+       }
+
+       @Test
+       public void testRangePartitioningPreserved2() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setRangePartitioned(o);
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.RANGE_PARTITIONED, 
filtered.getPartitioning());
+               assertNotNull(filtered.getOrdering());
+               assertEquals(3, filtered.getOrdering().getNumberOfFields());
+               assertEquals(7, 
filtered.getOrdering().getFieldNumber(0).intValue());
+               assertEquals(1, 
filtered.getOrdering().getFieldNumber(1).intValue());
+               assertEquals(2, 
filtered.getOrdering().getFieldNumber(2).intValue());
+               assertEquals(LongValue.class, 
filtered.getOrdering().getType(0));
+               assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+               assertEquals(ByteValue.class, 
filtered.getOrdering().getType(2));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(0));
+               assertEquals(Order.ASCENDING, 
filtered.getOrdering().getOrder(1));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(2));
+               assertNull(filtered.getPartitionedFields());
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+       }
+
+       @Test
+       public void testRangePartitioningPreserved3() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
+
+               DataDistribution dd = new MockDistribution();
+               Ordering o = new Ordering();
+               o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setRangePartitioned(o, dd);
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.RANGE_PARTITIONED, 
filtered.getPartitioning());
+               assertNotNull(filtered.getOrdering());
+               assertEquals(3, filtered.getOrdering().getNumberOfFields());
+               assertEquals(7, 
filtered.getOrdering().getFieldNumber(0).intValue());
+               assertEquals(1, 
filtered.getOrdering().getFieldNumber(1).intValue());
+               assertEquals(2, 
filtered.getOrdering().getFieldNumber(2).intValue());
+               assertEquals(LongValue.class, 
filtered.getOrdering().getType(0));
+               assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+               assertEquals(ByteValue.class, 
filtered.getOrdering().getType(2));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(0));
+               assertEquals(Order.ASCENDING, 
filtered.getOrdering().getOrder(1));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(2));
+               assertNotNull(filtered.getDataDistribution());
+               assertEquals(dd, filtered.getDataDistribution());
+               assertNull(filtered.getPartitionedFields());
+               assertNull(filtered.getCustomPartitioner());
+       }
+
+       @Test
+       public void testRangePartitioningErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setRangePartitioned(o);
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testCustomPartitioningErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setCustomPartitioned(new FieldSet(0, 1, 2), new 
MockPartitioner());
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testRandomDistributionErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setRandomDistribution();
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testReplicationErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setFullyReplicated();
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testRebalancingErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setForceRebalancing();
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testDualHashPartitioningPreserved() {
+
+               DualInputSemanticProperties dprops = new 
DualInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsDualFromString(dprops, new 
String[]{"0;2;4"}, new String[]{"1->3;4->6;3->7"},
+                               null, null, null, null, tupleInfo, tupleInfo, 
tupleInfo);
+
+               RequestedGlobalProperties gprops1 = new 
RequestedGlobalProperties();
+               RequestedGlobalProperties gprops2 = new 
RequestedGlobalProperties();
+               gprops1.setHashPartitioned(new FieldSet(2, 0, 4));
+               gprops2.setHashPartitioned(new FieldSet(3, 6, 7));
+               RequestedGlobalProperties filtered1 = 
gprops1.filterBySemanticProperties(dprops, 0);
+               RequestedGlobalProperties filtered2 = 
gprops2.filterBySemanticProperties(dprops, 1);
+
+               assertNotNull(filtered1);
+               assertEquals(PartitioningProperty.HASH_PARTITIONED, 
filtered1.getPartitioning());
+               assertNotNull(filtered1.getPartitionedFields());
+               assertEquals(3, filtered1.getPartitionedFields().size());
+               assertTrue(filtered1.getPartitionedFields().contains(0));
+               assertTrue(filtered1.getPartitionedFields().contains(2));
+               assertTrue(filtered1.getPartitionedFields().contains(4));
+               assertNull(filtered1.getOrdering());
+               assertNull(filtered1.getCustomPartitioner());
+               assertNull(filtered1.getDataDistribution());
+
+               assertNotNull(filtered2);
+               assertEquals(PartitioningProperty.HASH_PARTITIONED, 
filtered2.getPartitioning());
+               assertNotNull(filtered2.getPartitionedFields());
+               assertEquals(3, filtered2.getPartitionedFields().size());
+               assertTrue(filtered2.getPartitionedFields().contains(1));
+               assertTrue(filtered2.getPartitionedFields().contains(3));
+               assertTrue(filtered2.getPartitionedFields().contains(4));
+               assertNull(filtered2.getOrdering());
+               assertNull(filtered2.getCustomPartitioner());
+               assertNull(filtered2.getDataDistribution());
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testInvalidInputIndex() {
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties gprops = new 
RequestedGlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0,1));
+
+               gprops.filterBySemanticProperties(sprops, 1);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java
new file mode 100644
index 0000000..f3aaf05
--- /dev/null
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RequestedLocalPropertiesFilteringTest {
+
+       private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, 
Integer, Integer, Integer, Integer>> tupleInfo =
+                       new TupleTypeInfo<Tuple8<Integer, Integer, Integer, 
Integer, Integer, Integer, Integer, Integer>>(
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO,
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+                       );
+
+       @Test(expected = NullPointerException.class)
+       public void testNullProps() {
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(null, 0);
+       }
+
+       @Test
+       public void testAllErased() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testGroupingPreserved1() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNotNull(filtered);
+               assertNotNull(filtered.getGroupedFields());
+               assertEquals(3, filtered.getGroupedFields().size());
+               assertTrue(filtered.getGroupedFields().contains(0));
+               assertTrue(filtered.getGroupedFields().contains(2));
+               assertTrue(filtered.getGroupedFields().contains(3));
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testGroupingPreserved2() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"3->0;5->2;1->3"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNotNull(filtered);
+               assertNotNull(filtered.getGroupedFields());
+               assertEquals(3, filtered.getGroupedFields().size());
+               assertTrue(filtered.getGroupedFields().contains(3));
+               assertTrue(filtered.getGroupedFields().contains(5));
+               assertTrue(filtered.getGroupedFields().contains(1));
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testGroupingErased() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testOrderPreserved1() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"1;4;6"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setOrdering(o);
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNotNull(filtered);
+               assertNotNull(filtered.getOrdering());
+               assertEquals(3, filtered.getOrdering().getNumberOfFields());
+               assertEquals(4, 
filtered.getOrdering().getFieldNumber(0).intValue());
+               assertEquals(1, 
filtered.getOrdering().getFieldNumber(1).intValue());
+               assertEquals(6, 
filtered.getOrdering().getFieldNumber(2).intValue());
+               assertEquals(LongValue.class, 
filtered.getOrdering().getType(0));
+               assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+               assertEquals(ByteValue.class, 
filtered.getOrdering().getType(2));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(0));
+               assertEquals(Order.ASCENDING, 
filtered.getOrdering().getOrder(1));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(2));
+               assertNull(filtered.getGroupedFields());
+       }
+
+       @Test
+       public void testOrderPreserved2() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"5->1;0->4;2->6"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setOrdering(o);
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNotNull(filtered);
+               assertNotNull(filtered.getOrdering());
+               assertEquals(3, filtered.getOrdering().getNumberOfFields());
+               assertEquals(0, 
filtered.getOrdering().getFieldNumber(0).intValue());
+               assertEquals(5, 
filtered.getOrdering().getFieldNumber(1).intValue());
+               assertEquals(2, 
filtered.getOrdering().getFieldNumber(2).intValue());
+               assertEquals(LongValue.class, 
filtered.getOrdering().getType(0));
+               assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+               assertEquals(ByteValue.class, 
filtered.getOrdering().getType(2));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(0));
+               assertEquals(Order.ASCENDING, 
filtered.getOrdering().getOrder(1));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(2));
+               assertNull(filtered.getGroupedFields());
+       }
+
+       @Test
+       public void testOrderErased() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"1; 4"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setOrdering(o);
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testDualGroupingPreserved() {
+
+               DualInputSemanticProperties dprops = new 
DualInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsDualFromString(dprops, new 
String[]{"1->0;3;2->4"}, new String[]{"0->7;1"},
+                               null, null, null, null, tupleInfo, tupleInfo, 
tupleInfo);
+
+               RequestedLocalProperties lprops1 = new 
RequestedLocalProperties();
+               lprops1.setGroupedFields(new FieldSet(0,3,4));
+
+               RequestedLocalProperties lprops2 = new 
RequestedLocalProperties();
+               lprops2.setGroupedFields(new FieldSet(7, 1));
+
+               RequestedLocalProperties filtered1 = 
lprops1.filterBySemanticProperties(dprops, 0);
+               RequestedLocalProperties filtered2 = 
lprops2.filterBySemanticProperties(dprops, 1);
+
+               assertNotNull(filtered1);
+               assertNotNull(filtered1.getGroupedFields());
+               assertEquals(3, filtered1.getGroupedFields().size());
+               assertTrue(filtered1.getGroupedFields().contains(1));
+               assertTrue(filtered1.getGroupedFields().contains(2));
+               assertTrue(filtered1.getGroupedFields().contains(3));
+               assertNull(filtered1.getOrdering());
+
+               assertNotNull(filtered2);
+               assertNotNull(filtered2.getGroupedFields());
+               assertEquals(2, filtered2.getGroupedFields().size());
+               assertTrue(filtered2.getGroupedFields().contains(0));
+               assertTrue(filtered2.getGroupedFields().contains(1));
+               assertNull(filtered2.getOrdering());
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testInvalidInputIndex() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"1; 4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(1, 4));
+
+               rlProp.filterBySemanticProperties(sProps, 1);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java
index 4d81e0b..3b9a7a2 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java
@@ -278,19 +278,19 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                                }
                        })
                        .name(JOIN_WITH_SOLUTION_SET)
-                       .withConstantSetSecond(joinPreservesSolutionSet ? new 
String[] {"0->0", "1->1", "2->2" } : null);
+                       .withForwardedFieldsSecond(joinPreservesSolutionSet ? 
new String[] {"0->0", "1->1", "2->2" } : null);
                        
                DataSet<Tuple3<Long, Long, Long>> nextWorkset = 
joinedWithSolutionSet.groupBy(1, 2)
                        .reduceGroup(new 
RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
                                public void reduce(Iterable<Tuple3<Long, Long, 
Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
                        })
                        .name(NEXT_WORKSET_REDUCER_NAME)
-                       .withConstantSet("1->1","2->2","0->0");
+                       .withForwardedFields("1->1","2->2","0->0");
                
                
                DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = 
mapBeforeSolutionDelta ?
                                joinedWithSolutionSet.map(new 
RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public 
Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } 
})
-                                       
.name(SOLUTION_DELTA_MAPPER_NAME).withConstantSet("0->0","1->1","2->2") :
+                                       
.name(SOLUTION_DELTA_MAPPER_NAME).withForwardedFields("0->0","1->1","2->2") :
                                joinedWithSolutionSet;
                
                iter.closeWith(nextSolutionSet, nextWorkset)

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
index 6325788..9cdea6d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
@@ -59,7 +59,7 @@ public abstract class DualInputOperator<IN1, IN2, OUT, FT 
extends Function> exte
        /**
         * Semantic properties of the associated function.
         */
-       private DualInputSemanticProperties semanticProperties;
+       private DualInputSemanticProperties semanticProperties = new 
DualInputSemanticProperties();
        
        // 
--------------------------------------------------------------------------------------------
        

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
index fe53380..70fd753 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
@@ -27,20 +27,20 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 /**
  * Container for the semantic properties associated to a dual input operator.
  */
-public class DualInputSemanticProperties extends SemanticProperties {
+public class DualInputSemanticProperties implements SemanticProperties {
        private static final long serialVersionUID = 1L;
-       
+
        /**
         * Mapping from fields in the source record(s) in the first input to 
fields
         * in the destination record(s).  
         */
-       private Map<Integer,FieldSet> forwardedFields1;
+       private Map<Integer,FieldSet> fieldMapping1;
        
        /**
         * Mapping from fields in the source record(s) in the second input to 
fields
         * in the destination record(s).  
         */
-       private Map<Integer,FieldSet> forwardedFields2;
+       private Map<Integer,FieldSet> fieldMapping2;
        
        /**
         * Set of fields that are read in the source record(s) from the
@@ -56,293 +56,124 @@ public class DualInputSemanticProperties extends 
SemanticProperties {
 
        
        public DualInputSemanticProperties() {
-               init();
+               this.fieldMapping1 = new HashMap<Integer,FieldSet>();
+               this.fieldMapping2 = new HashMap<Integer,FieldSet>();
+               this.readFields1 = null;
+               this.readFields2 = null;
        }
 
-       /**
-        * Finds the source field where the given field was forwarded from.
-        * @param dest The destination field in the output data.
-        * @return FieldSet containing the source input fields.
-        */
-       public FieldSet forwardedFrom1(int dest) {
-               FieldSet fs = null;
-               for (Map.Entry<Integer, FieldSet> entry : 
forwardedFields1.entrySet()) {
-                       if (entry.getValue().contains(dest)) {
-                               if (fs == null) {
-                                       fs = new FieldSet();
-                               }
+       @Override
+       public FieldSet getForwardingTargetFields(int input, int sourceField) {
 
-                               fs = fs.addField(entry.getKey());
-                       }
+               if (input != 0 && input != 1) {
+                       throw new IndexOutOfBoundsException();
+               } else if (input == 0) {
+
+                       return fieldMapping1.containsKey(sourceField) ? 
fieldMapping1.get(sourceField) : FieldSet.EMPTY_SET;
+               } else {
+                       return fieldMapping2.containsKey(sourceField) ? 
fieldMapping2.get(sourceField) : FieldSet.EMPTY_SET;
                }
-               return fs;
        }
 
-       public FieldSet forwardedFrom2(int dest) {
-               FieldSet fs = null;
-               for (Map.Entry<Integer, FieldSet> entry : 
forwardedFields2.entrySet()) {
-                       if (entry.getValue().contains(dest)) {
-                               if (fs == null) {
-                                       fs = new FieldSet();
-                               }
+       @Override
+       public int getForwardingSourceField(int input, int targetField) {
+               Map<Integer, FieldSet> fieldMapping;
+
+               if (input != 0 && input != 1) {
+                       throw new IndexOutOfBoundsException();
+               } else if (input == 0) {
+                       fieldMapping = fieldMapping1;
+               } else {
+                       fieldMapping = fieldMapping2;
+               }
 
-                               fs = fs.addField(entry.getKey());
+               for (Map.Entry<Integer, FieldSet> e : fieldMapping.entrySet()) {
+                       if (e.getValue().contains(targetField)) {
+                               return e.getKey();
                        }
                }
-               return fs;
+               return -1;
        }
 
-       /**
-        * Adds, to the existing information, a field that is forwarded directly
-        * from the source record(s) in the first input to the destination
-        * record(s).
-        * 
-        * @param sourceField the position in the source record(s) from the 
first input
-        * @param destinationField the position in the destination record(s)
-        */
-       public void addForwardedField1(int sourceField, int destinationField) {
-               FieldSet old = this.forwardedFields1.get(sourceField);
-               if (old == null) {
-                       old = FieldSet.EMPTY_SET;
+       @Override
+       public FieldSet getReadFields(int input) {
+               if (input != 0 && input != 1) {
+                       throw new IndexOutOfBoundsException();
                }
-               
-               FieldSet fs = old.addField(destinationField);
-               this.forwardedFields1.put(sourceField, fs);
-       }
-       
-       /**
-        * Adds, to the existing information, a field that is forwarded directly
-        * from the source record(s) in the first input to multiple fields in
-        * the destination record(s).
-        * 
-        * @param sourceField the position in the source record(s)
-        * @param destinationFields the position in the destination record(s)
-        */
-       public void addForwardedField1(int sourceField, FieldSet 
destinationFields) {
-               FieldSet old = this.forwardedFields1.get(sourceField);
-               if (old == null) {
-                       old = FieldSet.EMPTY_SET;
+
+               if (input == 0) {
+                       return readFields1;
+               } else {
+                       return readFields2;
                }
-               
-               FieldSet fs = old.addFields(destinationFields);
-               this.forwardedFields1.put(sourceField, fs);
        }
-       
-       /**
-        * Sets a field that is forwarded directly from the source
-        * record(s) in the first input to multiple fields in the
-        * destination record(s).
-        * 
-        * @param sourceField the position in the source record(s)
-        * @param destinationFields the position in the destination record(s)
-        */
-       public void setForwardedField1(int sourceField, FieldSet 
destinationFields) {
-               this.forwardedFields1.put(sourceField, destinationFields);
-       }
-       
-       /**
-        * Gets the fields in the destination record where the source
-        * field from the first input is forwarded.
-        * 
-        * @param sourceField the position in the source record
-        * @return the destination fields, or null if they do not exist
-        */
-       public FieldSet getForwardedField1(int sourceField) {
-               if (isAllFieldsConstant()) {
-                       return new FieldSet(sourceField);
-               }
 
-               return this.forwardedFields1.get(sourceField);
-       }
-       
        /**
         * Adds, to the existing information, a field that is forwarded directly
-        * from the source record(s) in the second input to the destination
+        * from the source record(s) in the first input to the destination
         * record(s).
-        * 
-        * @param sourceField the position in the source record(s) from the 
first input
-        * @param destinationField the position in the destination record(s)
-        */
-       public void addForwardedField2(int sourceField, int destinationField) {
-               FieldSet old = this.forwardedFields2.get(sourceField);
-               if (old == null) {
-                       old = FieldSet.EMPTY_SET;
-               }
-               
-               FieldSet fs = old.addField(destinationField);
-               this.forwardedFields2.put(sourceField, fs);
-       }
-       
-       /**
-        * Adds, to the existing information, a field that is forwarded directly
-        * from the source record(s) in the second input to multiple fields in
-        * the destination record(s).
-        * 
-        * @param sourceField the position in the source record(s)
-        * @param destinationFields the position in the destination record(s)
-        */
-       public void addForwardedField2(int sourceField, FieldSet 
destinationFields) {
-               FieldSet old = this.forwardedFields2.get(sourceField);
-               if (old == null) {
-                       old = FieldSet.EMPTY_SET;
-               }
-               
-               FieldSet fs = old.addFields(destinationFields);
-               this.forwardedFields2.put(sourceField, fs);
-       }
-       
-       /**
-        * Sets a field that is forwarded directly from the source
-        * record(s) in the second input to multiple fields in the
-        * destination record(s).
-        * 
-        * @param sourceField the position in the source record(s)
-        * @param destinationFields the position in the destination record(s)
-        */
-       public void setForwardedField2(int sourceField, FieldSet 
destinationFields) {
-               this.forwardedFields2.put(sourceField, destinationFields);
-       }
-       
-       /**
-        * Gets the fields in the destination record where the source
-        * field from the second input is forwarded.
-        * 
+        *
+        * @param input the input of the source field
         * @param sourceField the position in the source record
-        * @return the destination fields, or null if they do not exist
+        * @param targetField the position in the destination record
         */
-       public FieldSet getForwardedField2(int sourceField) {
-               if (isAllFieldsConstant()) {
-                       return new FieldSet(sourceField);
-               }
+       public void addForwardedField(int input, int sourceField, int 
targetField) {
 
-               return this.forwardedFields2.get(sourceField);
-       }
+               Map<Integer, FieldSet> fieldMapping;
 
-       @Override
-       public FieldSet getSourceField(int input, int field) {
-               if (isAllFieldsConstant()) {
-                       return new FieldSet(field);
+               if (input != 0 && input != 1) {
+                       throw new IndexOutOfBoundsException();
+               } else if (input == 0) {
+                       fieldMapping = this.fieldMapping1;
+               } else {
+                       fieldMapping = this.fieldMapping2;
                }
 
-               switch(input) {
-                       case 0:
-                               return this.forwardedFrom1(field);
-                       case 1:
-                               return this.forwardedFrom2(field);
-                       default:
-                               throw new IndexOutOfBoundsException();
+               if(isTargetFieldPresent(targetField, fieldMapping)) {
+                       throw new InvalidSemanticAnnotationException("Target 
field "+targetField+" was added twice to input "+input);
                }
-       }
 
-       @Override
-       public FieldSet getForwardFields(int input, int field) {
-               if (isAllFieldsConstant()) {
-                       return new FieldSet(field);
+               FieldSet targetFields = fieldMapping.get(sourceField);
+               if (targetFields != null) {
+                       fieldMapping.put(sourceField, 
targetFields.addField(targetField));
+               } else {
+                       fieldMapping.put(sourceField, new 
FieldSet(targetField));
                }
+       }
 
-               if (input == 0) {
-                       return this.getForwardedField1(field);
-               } else if (input == 1) {
-                       return this.getForwardedField2(field);
+       private boolean isTargetFieldPresent(int targetField, Map<Integer, 
FieldSet> fieldMapping) {
+
+               for(FieldSet targetFields : fieldMapping.values()) {
+                       if(targetFields.contains(targetField)) {
+                               return true;
+                       }
                }
-               return null;
+               return false;
        }
 
        /**
         * Adds, to the existing information, field(s) that are read in
         * the source record(s) from the first input.
-        * 
-        * @param readFields the position(s) in the source record(s)
-        */
-       public void addReadFields1(FieldSet readFields) {
-               if (this.readFields1 == null) {
-                       this.readFields1 = readFields;
-               } else {
-                       this.readFields1 = 
this.readFields2.addFields(readFields);
-               }
-       }
-       
-       /**
-        * Sets the field(s) that are read in the source record(s) from the 
first
-        * input.
-        * 
-        * @param readFields the position(s) in the source record(s)
-        */
-       public void setReadFields1(FieldSet readFields) {
-               this.readFields1 = readFields;
-       }
-       
-       /**
-        * Gets the field(s) in the source record(s) from the first input
-        * that are read.
-        * 
-        * @return the field(s) in the record, or null if they are not set
-        */
-       public FieldSet getReadFields1() {
-               return this.readFields1;
-       }
-       
-       /**
-        * Adds, to the existing information, field(s) that are read in
-        * the source record(s) from the second input.
-        * 
+        *
+        * @param input the input of the read fields
         * @param readFields the position(s) in the source record(s)
         */
-       public void addReadFields2(FieldSet readFields) {
-               if (this.readFields2 == null) {
-                       this.readFields2 = readFields;
+       public void addReadFields(int input, FieldSet readFields) {
+
+               FieldSet curReadFields;
+
+               if (input != 0 && input != 1) {
+                       throw new IndexOutOfBoundsException();
+               } else if (input == 0) {
+                       this.readFields1 = (this.readFields1 == null) ? 
readFields.clone() : this.readFields1.addFields(readFields);
                } else {
-                       this.readFields2 = 
this.readFields2.addFields(readFields);
+                       this.readFields2 = (this.readFields2 == null) ? 
readFields.clone() : this.readFields2.addFields(readFields);
                }
        }
-       
-       /**
-        * Sets the field(s) that are read in the source record(s) from the 
second
-        * input.
-        * 
-        * @param readFields the position(s) in the source record(s)
-        */
-       public void setReadFields2(FieldSet readFields) {
-               this.readFields2 = readFields;
-       }
-       
-       /**
-        * Gets the field(s) in the source record(s) from the second input
-        * that are read.
-        * 
-        * @return the field(s) in the record, or null if they are not set
-        */
-       public FieldSet getReadFields2() {
-               return this.readFields2;
-       }
-       
-       /**
-        * Clears the object.
-        */
-       @Override
-       public void clearProperties() {
-               super.clearProperties();
-               init();
-       }
 
        @Override
-       public boolean isEmpty() {
-               return super.isEmpty() &&
-                               (forwardedFields1 == null || 
forwardedFields1.isEmpty()) &&
-                               (forwardedFields2 == null || 
forwardedFields2.isEmpty()) &&
-                               (readFields1 == null || readFields1.size() == 
0) &&
-                               (readFields2 == null || readFields2.size() == 
0);
-       }
-       
-       @Override
        public String toString() {
-               return "DISP(" + this.forwardedFields1 + "; " + 
this.forwardedFields2 + ")";
+               return "DISP(" + this.fieldMapping1 + "; " + this.fieldMapping2 
+ ")";
        }
 
-       private void init() {
-               this.forwardedFields1 = new HashMap<Integer,FieldSet>();
-               this.forwardedFields2 = new HashMap<Integer,FieldSet>();
-               this.readFields1 = null;
-               this.readFields2 = null;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
index 7b90c8e..e99cac7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
@@ -144,7 +144,7 @@ public class Ordering {
                }
                
                for (int i = 0; i < this.indexes.size(); i++) {
-                       if (this.indexes.get(i).intValue() != 
otherOrdering.indexes.get(i).intValue()) {
+                       if (this.indexes.get(i) != 
otherOrdering.indexes.get(i)) {
                                return false;
                        }
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
index da99018..5afee79 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
@@ -21,72 +21,73 @@ package org.apache.flink.api.common.operators;
 
 import java.io.Serializable;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.util.FieldSet;
 
 /**
  * Container for the semantic properties associated to an operator.
  */
-public abstract class SemanticProperties implements Serializable {
-       private boolean allFieldsConstant;
+public interface SemanticProperties extends Serializable {
 
-       private static final long serialVersionUID = 1L;
+       /**
+        * Returns the indexes of all target fields to which a source field has 
been
+        * unmodified copied by a function.
+        *
+        * @param input The input id for the requested source field (0 for 
first input, 1 for second input)
+        * @param sourceField The index of the field for which the target 
position index is requested.
+        * @return A set containing the indexes of all target fields to which 
the source field has been unmodified copied.
+        *
+        */
+       public FieldSet getForwardingTargetFields(int input, int sourceField);
+
+       /**
+        * Returns the index of the source field on the given input from which 
the target field
+        * has been unmodified copied by a function.
+        *
+        * @param input The input id for the requested source field (0 for 
first input, 1 for second input)
+        * @param targetField The index of the target field to which the source 
field has been copied.
+        * @return The index of the source field on the given index that was 
copied to the given target field.
+        *                      -1 if the target field was not copied from any 
source field of the given input.
+        */
+       public int getForwardingSourceField(int input, int targetField);
 
-       /** Set of fields that are written in the destination record(s).*/
-       private FieldSet writtenFields;
-       
-       
        /**
-        * Adds, to the existing information, field(s) that are written in
-        * the destination record(s).
-        * 
-        * @param writtenFields the position(s) in the destination record(s)
+        * Returns the position indexes of all fields of an input that are 
accessed by a function.
+        *
+        * @param input The input id for which accessed fields are requested.
+        * @return A set of fields of the specified input which have been 
accessed by the function. Null if no information is available.
         */
-       public void addWrittenFields(FieldSet writtenFields) {
-               if(this.writtenFields == null) {
-                       this.writtenFields = writtenFields;
-               } else {
-                       this.writtenFields = 
this.writtenFields.addFields(writtenFields);
+       public FieldSet getReadFields(int input);
+
+       // 
----------------------------------------------------------------------
+
+       public static class InvalidSemanticAnnotationException extends 
InvalidProgramException {
+
+               public InvalidSemanticAnnotationException(String s) {
+                       super(s);
                }
-       }
 
-       public void setAllFieldsConstant(boolean constant) {
-               this.allFieldsConstant = constant;
+               public InvalidSemanticAnnotationException(String s, Throwable 
e) {
+                       super(s,e);
+               }
        }
 
-       public boolean isAllFieldsConstant() {
-               return this.allFieldsConstant;
-       }
+       public static class EmptySemanticProperties implements 
SemanticProperties {
+
+               @Override
+               public FieldSet getForwardingTargetFields(int input, int 
sourceField) {
+                       return FieldSet.EMPTY_SET;
+               }
 
-       public abstract FieldSet getForwardFields(int input, int field);
+               @Override
+               public int getForwardingSourceField(int input, int targetField) 
{
+                       return -1;
+               }
 
-       public abstract FieldSet getSourceField(int input, int field);
+               @Override
+               public FieldSet getReadFields(int input) {
+                       return null;
+               }
 
-       /**
-        * Sets the field(s) that are written in the destination record(s).
-        * 
-        * @param writtenFields the position(s) in the destination record(s)
-        */
-       public void setWrittenFields(FieldSet writtenFields) {
-               this.writtenFields = writtenFields;
-       }
-       
-       /**
-        * Gets the field(s) in the destination record(s) that are written.
-        * 
-        * @return the field(s) in the record, or null if they are not set
-        */
-       public FieldSet getWrittenFields() {
-               return this.writtenFields;
-       }
-       
-       /**
-        * Clears the object.
-        */
-       public void clearProperties() {
-               this.writtenFields = null;
-       }
-       
-       public boolean isEmpty() {
-               return this.writtenFields == null || this.writtenFields.size() 
== 0;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
index 5b491bf..eddf89b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
@@ -42,7 +42,7 @@ public abstract class SingleInputOperator<IN, OUT, FT extends 
Function> extends
        private final int[] keyFields;
        
        /** Semantic properties of the associated function. */
-       private SingleInputSemanticProperties semanticProperties;
+       private SingleInputSemanticProperties semanticProperties = new 
SingleInputSemanticProperties();
        
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
index abe995b..23bbc8b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
@@ -26,120 +26,90 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 /**
  * Container for the semantic properties associated to a single input operator.
  */
-public class SingleInputSemanticProperties extends SemanticProperties {
-       
+public class SingleInputSemanticProperties implements SemanticProperties {
        private static final long serialVersionUID = 1L;
-       
-       /**Mapping from fields in the source record(s) to fields in the 
destination record(s). */
-       private Map<Integer,FieldSet> forwardedFields;
-       
-       /** Set of fields that are read in the source record(s).*/
+
+       /**
+        * Mapping from fields in the source record(s) to fields in the 
destination
+        * record(s).
+        */
+       private Map<Integer,FieldSet> fieldMapping;
+
+       /**
+        * Set of fields that are read in the source record(s).
+        */
        private FieldSet readFields;
 
+       public SingleInputSemanticProperties() {
+               this.fieldMapping = new HashMap<Integer, FieldSet>();
+               this.readFields = null;
+       }
+
        @Override
-       public FieldSet getForwardFields(int input, int field) {
+       public FieldSet getForwardingTargetFields(int input, int sourceField) {
                if (input != 0) {
                        throw new IndexOutOfBoundsException();
                }
-               return this.getForwardedField(field);
+
+               return this.fieldMapping.containsKey(sourceField) ? 
this.fieldMapping.get(sourceField) : FieldSet.EMPTY_SET;
        }
 
        @Override
-       public FieldSet getSourceField(int input, int field) {
+       public int getForwardingSourceField(int input, int targetField) {
                if (input != 0) {
                        throw new IndexOutOfBoundsException();
                }
 
-               if (isAllFieldsConstant()) {
-                       return new FieldSet(field);
+               for (Map.Entry<Integer, FieldSet> e : fieldMapping.entrySet()) {
+                       if (e.getValue().contains(targetField)) {
+                               return e.getKey();
+                       }
                }
-
-               return this.forwardedFrom(field);
+               return -1;
        }
 
-       public FieldSet forwardedFrom(int dest) {
-               FieldSet fs = null;
-               for (Map.Entry<Integer, FieldSet> entry : 
forwardedFields.entrySet()) {
-                       if (entry.getValue().contains(dest)) {
-                               if (fs == null) {
-                                       fs = new FieldSet();
-                               }
-
-                               fs = fs.addField(entry.getKey());
-                       }
+       @Override
+       public FieldSet getReadFields(int input) {
+               if (input != 0) {
+                       throw new IndexOutOfBoundsException();
                }
-               return fs;
-       }
 
-       public SingleInputSemanticProperties() {
-               init();
+               return this.readFields;
        }
-       
+
        /**
         * Adds, to the existing information, a field that is forwarded directly
         * from the source record(s) to the destination record(s).
-        * 
+        *
         * @param sourceField the position in the source record(s)
-        * @param destinationField the position in the destination record(s)
+        * @param targetField the position in the destination record(s)
         */
-       public void addForwardedField(int sourceField, int destinationField) {
-               FieldSet old = this.forwardedFields.get(sourceField);
-               if (old == null) {
-                       old = FieldSet.EMPTY_SET;
+       public void addForwardedField(int sourceField, int targetField) {
+               if(isTargetFieldPresent(targetField)) {
+                       throw new InvalidSemanticAnnotationException("Target 
field "+targetField+" was added twice.");
                }
-               
-               FieldSet fs = old.addField(destinationField);
-               this.forwardedFields.put(sourceField, fs);
-       }
-       
-       /**
-        * Adds, to the existing information, a field that is forwarded directly
-        * from the source record(s) to multiple fields in the destination
-        * record(s).
-        * 
-        * @param sourceField the position in the source record(s)
-        * @param destinationFields the position in the destination record(s)
-        */
-       public void addForwardedField(int sourceField, FieldSet 
destinationFields) {
-               FieldSet old = this.forwardedFields.get(sourceField);
-               if (old == null) {
-                       old = FieldSet.EMPTY_SET;
+
+               FieldSet targetFields = fieldMapping.get(sourceField);
+               if (targetFields != null) {
+                       fieldMapping.put(sourceField, 
targetFields.addField(targetField));
+               } else {
+                       fieldMapping.put(sourceField, new 
FieldSet(targetField));
                }
-               
-               FieldSet fs = old.addFields(destinationFields);
-               this.forwardedFields.put(sourceField, fs);
        }
-       
-       /**
-        * Sets a field that is forwarded directly from the source
-        * record(s) to multiple fields in the destination record(s).
-        * 
-        * @param sourceField the position in the source record(s)
-        * @param destinationFields the position in the destination record(s)
-        */
-       public void setForwardedField(int sourceField, FieldSet 
destinationFields) {
-               this.forwardedFields.put(sourceField,destinationFields);
-       }
-       
-       /**
-        * Gets the fields in the destination record where the source
-        * field is forwarded.
-        * 
-        * @param sourceField the position in the source record
-        * @return the destination fields, or null if they do not exist
-        */
-       public FieldSet getForwardedField(int sourceField) {
-               if (isAllFieldsConstant()) {
-                       return new FieldSet(sourceField);
-               }
 
-               return this.forwardedFields.get(sourceField);
+       private boolean isTargetFieldPresent(int targetField) {
+               for(FieldSet targetFields : fieldMapping.values()) {
+                       if(targetFields.contains(targetField)) {
+                               return true;
+                       }
+               }
+               return false;
        }
-       
+
        /**
         * Adds, to the existing information, field(s) that are read in
         * the source record(s).
-        * 
+        *
         * @param readFields the position(s) in the source record(s)
         */
        public void addReadFields(FieldSet readFields) {
@@ -149,112 +119,39 @@ public class SingleInputSemanticProperties extends 
SemanticProperties {
                        this.readFields = this.readFields.addFields(readFields);
                }
        }
-       
-       /**
-        * Sets the field(s) that are read in the source record(s).
-        * 
-        * @param readFields the position(s) in the source record(s)
-        */
-       public void setReadFields(FieldSet readFields) {
-               this.readFields = readFields;
-       }
-       
-       /**
-        * Gets the field(s) in the source record(s) that are read.
-        * 
-        * @return the field(s) in the record, or null if they are not set
-        */
-       public FieldSet getReadFields() {
-               return this.readFields;
-       }
-       
-       /**
-        * Clears the object.
-        */
-       @Override
-       public void clearProperties() {
-               super.clearProperties();
-               init();
-       }
-       
-       @Override
-       public boolean isEmpty() {
-               return super.isEmpty() &&
-                               (forwardedFields == null || 
forwardedFields.isEmpty()) &&
-                               (readFields == null || readFields.size() == 0);
-       }
 
        @Override
        public String toString() {
-               return "SISP(" + this.forwardedFields + ")";
+               return "SISP(" + this.fieldMapping + ")";
        }
 
-       private void init() {
-               this.forwardedFields = new HashMap<Integer,FieldSet>();
-               this.readFields = null;
-       }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
-       public static class AllFieldsConstantProperties extends 
SingleInputSemanticProperties {
-               
-               private static final long serialVersionUID = 1L;
 
-               @Override
-               public FieldSet getReadFields() {
-                       return FieldSet.EMPTY_SET;
-               }
-               
-               @Override
-               public FieldSet getWrittenFields() {
-                       return FieldSet.EMPTY_SET;
-               }
+       public static class AllFieldsForwardedProperties extends 
SingleInputSemanticProperties {
+
+               private static final long serialVersionUID = 1L;
 
                @Override
-               public FieldSet getForwardedField(int sourceField) {
+               public FieldSet getForwardingTargetFields(int input, int 
sourceField) {
+                       if(input != 0) {
+                               throw new IndexOutOfBoundsException();
+                       }
                        return new FieldSet(sourceField);
                }
-               
-               // ----- all mutating operations are unsupported -----
-               
-               @Override
-               public void addForwardedField(int sourceField, FieldSet 
destinationFields) {
-                       throw new UnsupportedOperationException();
-               }
-               
-               @Override
-               public void addForwardedField(int sourceField, int 
destinationField) {
-                       throw new UnsupportedOperationException();
-               }
-               
-               @Override
-               public void setForwardedField(int sourceField, FieldSet 
destinationFields) {
-                       throw new UnsupportedOperationException();
-               }
-               
-               @Override
-               public void addReadFields(FieldSet readFields) {
-                       throw new UnsupportedOperationException();
-               }
-               
-               @Override
-               public void setReadFields(FieldSet readFields) {
-                       throw new UnsupportedOperationException();
-               }
 
                @Override
-               public void addWrittenFields(FieldSet writtenFields) {
-                       throw new UnsupportedOperationException();
+               public int getForwardingSourceField(int input, int targetField) 
{
+                       if(input != 0) {
+                               throw new IndexOutOfBoundsException();
+                       }
+                       return targetField;
                }
 
                @Override
-               public void setWrittenFields(FieldSet writtenFields) {
-                       throw new UnsupportedOperationException();
-               }
-               
-               @Override
-               public boolean isEmpty() {
-                       return false;
+               public void addForwardedField(int sourceField, int targetField) 
{
+                       throw new UnsupportedOperationException("Cannot modify 
forwarded fields");
                }
+
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
index 6735298..3602a82 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
@@ -82,7 +82,7 @@ public class PartitionOperatorBase<IN> extends 
SingleInputOperator<IN, IN, NoOpF
        
        @Override
        public SingleInputSemanticProperties getSemanticProperties() {
-               return new 
SingleInputSemanticProperties.AllFieldsConstantProperties();
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 0a2f1b0..c3ea0e4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -38,10 +39,41 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
        }
        
        /**
-        * Returns the keyPosition for the given fieldPosition, offsetted by 
the given offset
+        * Returns the flat field descriptors for the given field expression.
+        *
+        * @param fieldExpression The field expression for which the flat field 
descriptors are computed.
+        * @return The list of descriptors for the flat fields which are 
specified by the field expression.
+        */
+       public List<FlatFieldDescriptor> getFlatFields(String fieldExpression) {
+               List<FlatFieldDescriptor> result = new 
ArrayList<FlatFieldDescriptor>();
+               this.getFlatFields(fieldExpression, 0, result);
+               return result;
+       }
+
+       /**
+        * Computes the flat field descriptors for the given field expression 
with the given offset.
+        *
+        * @param fieldExpression The field expression for which the 
FlatFieldDescriptors are computed.
+        * @param offset The offset to use when computing the positions of the 
flat fields.
+        * @param result The list into which all flat field descriptors are 
inserted.
+        */
+       public abstract void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result);
+
+       /**
+        * Returns the type of the (nested) field at the given field expression 
position.
+        * Wildcards are not allowed.
+        *
+        * @param fieldExpression The field expression for which the field of 
which the type is returned.
+        * @return The type of the field at the given field expression.
+        */
+       public abstract <X> TypeInformation<X> getTypeAt(String 
fieldExpression);
+
+       /**
+        * Returns the type of the (unnested) field at the given field position.
+        *
+        * @param pos The position of the (unnested) field in this composite 
type.
+        * @return The type of the field at the given position.
         */
-       public abstract void getKey(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result);
-       
        public abstract <X> TypeInformation<X> getTypeAt(int pos);
        
        /**
@@ -105,8 +137,8 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
                private TypeInformation<?> type;
                
                public FlatFieldDescriptor(int keyPosition, TypeInformation<?> 
type) {
-                       if( !(type instanceof AtomicType)) {
-                               throw new IllegalArgumentException("A flattened 
field can only be an atomic type");
+                       if(type instanceof CompositeType) {
+                               throw new IllegalArgumentException("A flattened 
field can not be a composite type");
                        }
                        this.keyPosition = keyPosition;
                        this.type = type;
@@ -126,4 +158,11 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
                        return "FlatFieldDescriptor [position="+keyPosition+" 
typeInfo="+type+"]";
                }
        }
+
+       public static class InvalidFieldReferenceException extends 
IllegalArgumentException {
+
+               public InvalidFieldReferenceException(String s) {
+                       super(s);
+               }
+       }
 }

Reply via email to