http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java
new file mode 100644
index 0000000..7ea7930
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class DualInputSemanticPropertiesTest {
+
+       @Test
+       public void testGetTargetFields() {
+
+               // first input
+               DualInputSemanticProperties sp = new 
DualInputSemanticProperties();
+               sp.addForwardedField(0, 0,1);
+               sp.addForwardedField(0, 1,4);
+               sp.addForwardedField(0, 2,3);
+               sp.addForwardedField(0, 3,2);
+
+               assertEquals(1, sp.getForwardingTargetFields(0, 0).size());
+               assertEquals(1, sp.getForwardingTargetFields(0, 1).size());
+               assertEquals(1, sp.getForwardingTargetFields(0, 2).size());
+               assertEquals(1, sp.getForwardingTargetFields(0, 3).size());
+               assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+               assertTrue(sp.getForwardingTargetFields(0, 1).contains(4));
+               assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
+               assertTrue(sp.getForwardingTargetFields(0, 3).contains(2));
+               assertNotNull(sp.getForwardingTargetFields(0, 4));
+               assertEquals(0, sp.getForwardingTargetFields(0, 4).size());
+
+               sp = new DualInputSemanticProperties();
+               sp.addForwardedField(0, 0,0);
+               sp.addForwardedField(0, 0,4);
+               sp.addForwardedField(0, 1,1);
+               sp.addForwardedField(0, 1,2);
+               sp.addForwardedField(0, 1,3);
+
+               assertEquals(2, sp.getForwardingTargetFields(0, 0).size());
+               assertEquals(3, sp.getForwardingTargetFields(0, 1).size());
+               assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+               assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
+               assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+               assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+               assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
+               assertNotNull(sp.getForwardingTargetFields(0, 2));
+               assertEquals(0, sp.getForwardingTargetFields(0, 2).size());
+
+               // second input
+               sp = new DualInputSemanticProperties();
+               sp.addForwardedField(1, 0,1);
+               sp.addForwardedField(1, 1,4);
+               sp.addForwardedField(1, 2,3);
+               sp.addForwardedField(1, 3,2);
+
+               assertEquals(1, sp.getForwardingTargetFields(1, 0).size());
+               assertEquals(1, sp.getForwardingTargetFields(1, 1).size());
+               assertEquals(1, sp.getForwardingTargetFields(1, 2).size());
+               assertEquals(1, sp.getForwardingTargetFields(1, 3).size());
+               assertTrue(sp.getForwardingTargetFields(1, 0).contains(1));
+               assertTrue(sp.getForwardingTargetFields(1, 1).contains(4));
+               assertTrue(sp.getForwardingTargetFields(1, 2).contains(3));
+               assertTrue(sp.getForwardingTargetFields(1, 3).contains(2));
+               assertNotNull(sp.getForwardingTargetFields(1, 4));
+               assertEquals(0, sp.getForwardingTargetFields(1, 4).size());
+
+               sp = new DualInputSemanticProperties();
+               sp.addForwardedField(1, 0,0);
+               sp.addForwardedField(1, 0,4);
+               sp.addForwardedField(1, 1,1);
+               sp.addForwardedField(1, 1,2);
+               sp.addForwardedField(1, 1,3);
+
+               assertEquals(2, sp.getForwardingTargetFields(1, 0).size());
+               assertEquals(3, sp.getForwardingTargetFields(1, 1).size());
+               assertTrue(sp.getForwardingTargetFields(1, 0).contains(0));
+               assertTrue(sp.getForwardingTargetFields(1, 0).contains(4));
+               assertTrue(sp.getForwardingTargetFields(1, 1).contains(1));
+               assertTrue(sp.getForwardingTargetFields(1, 1).contains(2));
+               assertTrue(sp.getForwardingTargetFields(1, 1).contains(3));
+               assertNotNull(sp.getForwardingTargetFields(1, 2));
+               assertEquals(0, sp.getForwardingTargetFields(1, 2).size());
+
+               // both inputs
+               sp = new DualInputSemanticProperties();
+               sp.addForwardedField(0, 2,6);
+               sp.addForwardedField(0, 7,8);
+               sp.addForwardedField(1, 0,1);
+               sp.addForwardedField(1, 1,4);
+
+               assertEquals(1, sp.getForwardingTargetFields(0, 2).size());
+               assertEquals(1, sp.getForwardingTargetFields(0, 7).size());
+               assertEquals(1, sp.getForwardingTargetFields(1, 0).size());
+               assertEquals(1, sp.getForwardingTargetFields(1, 1).size());
+               assertTrue(sp.getForwardingTargetFields(0, 2).contains(6));
+               assertTrue(sp.getForwardingTargetFields(0, 7).contains(8));
+               assertTrue(sp.getForwardingTargetFields(1, 0).contains(1));
+               assertTrue(sp.getForwardingTargetFields(1, 1).contains(4));
+               assertNotNull(sp.getForwardingTargetFields(0, 1));
+               assertNotNull(sp.getForwardingTargetFields(1, 4));
+               assertEquals(0, sp.getForwardingTargetFields(0, 1).size());
+               assertEquals(0, sp.getForwardingTargetFields(1, 4).size());
+
+               sp = new DualInputSemanticProperties();
+               sp.addForwardedField(0, 0,0);
+               sp.addForwardedField(0, 0,4);
+               sp.addForwardedField(0, 3,8);
+               sp.addForwardedField(1, 1,1);
+               sp.addForwardedField(1, 1,2);
+               sp.addForwardedField(1, 4,8);
+
+               assertEquals(2, sp.getForwardingTargetFields(0, 0).size());
+               assertEquals(1, sp.getForwardingTargetFields(0, 3).size());
+               assertEquals(2, sp.getForwardingTargetFields(1, 1).size());
+               assertEquals(1, sp.getForwardingTargetFields(1, 4).size());
+               assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+               assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
+               assertTrue(sp.getForwardingTargetFields(0, 3).contains(8));
+               assertTrue(sp.getForwardingTargetFields(1, 1).contains(1));
+               assertTrue(sp.getForwardingTargetFields(1, 1).contains(1));
+               assertTrue(sp.getForwardingTargetFields(1, 4).contains(8));
+
+       }
+
+       @Test
+       public void testGetSourceField() {
+
+               // first input
+               DualInputSemanticProperties sp = new 
DualInputSemanticProperties();
+               sp.addForwardedField(0, 0,1);
+               sp.addForwardedField(0, 1,4);
+               sp.addForwardedField(0, 2,3);
+               sp.addForwardedField(0, 3,2);
+
+               assertEquals(0, sp.getForwardingSourceField(0, 1));
+               assertEquals(1, sp.getForwardingSourceField(0, 4));
+               assertEquals(2, sp.getForwardingSourceField(0, 3));
+               assertEquals(3, sp.getForwardingSourceField(0, 2));
+               assertTrue(sp.getForwardingSourceField(0, 0) < 0);
+               assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+
+               sp = new DualInputSemanticProperties();
+               sp.addForwardedField(0, 0,0);
+               sp.addForwardedField(0, 0,4);
+               sp.addForwardedField(0, 1,1);
+               sp.addForwardedField(0, 1,2);
+               sp.addForwardedField(0, 1,3);
+
+               assertEquals(0, sp.getForwardingSourceField(0, 0));
+               assertEquals(0, sp.getForwardingSourceField(0, 4));
+               assertEquals(1, sp.getForwardingSourceField(0, 1));
+               assertEquals(1, sp.getForwardingSourceField(0, 2));
+               assertEquals(1, sp.getForwardingSourceField(0, 3));
+               assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+
+               // second input
+               sp = new DualInputSemanticProperties();
+               sp.addForwardedField(1, 0,1);
+               sp.addForwardedField(1, 1,4);
+               sp.addForwardedField(1, 2,3);
+               sp.addForwardedField(1, 3,2);
+
+               assertEquals(0, sp.getForwardingSourceField(1, 1));
+               assertEquals(1, sp.getForwardingSourceField(1, 4));
+               assertEquals(2, sp.getForwardingSourceField(1, 3));
+               assertEquals(3, sp.getForwardingSourceField(1, 2));
+               assertTrue(sp.getForwardingSourceField(1, 0) < 0);
+               assertTrue(sp.getForwardingSourceField(1, 5) < 0);
+
+               sp = new DualInputSemanticProperties();
+               sp.addForwardedField(1, 0,0);
+               sp.addForwardedField(1, 0,4);
+               sp.addForwardedField(1, 1,1);
+               sp.addForwardedField(1, 1,2);
+               sp.addForwardedField(1, 1,3);
+
+               assertEquals(0, sp.getForwardingSourceField(1, 0));
+               assertEquals(0, sp.getForwardingSourceField(1, 4));
+               assertEquals(1, sp.getForwardingSourceField(1, 1));
+               assertEquals(1, sp.getForwardingSourceField(1, 2));
+               assertEquals(1, sp.getForwardingSourceField(1, 3));
+               assertTrue(sp.getForwardingSourceField(1, 5) < 0);
+       }
+
+       @Test
+       public void testGetReadSet() {
+
+               // first input
+               DualInputSemanticProperties sp = new 
DualInputSemanticProperties();
+               sp.addReadFields(0, new FieldSet(0, 1));
+
+               assertEquals(2, sp.getReadFields(0).size());
+               assertTrue(sp.getReadFields(0).contains(0));
+               assertTrue(sp.getReadFields(0).contains(1));
+
+               sp.addReadFields(0, new FieldSet(3));
+
+               assertEquals(3, sp.getReadFields(0).size());
+               assertTrue(sp.getReadFields(0).contains(0));
+               assertTrue(sp.getReadFields(0).contains(1));
+               assertTrue(sp.getReadFields(0).contains(3));
+
+               // second input
+               sp = new DualInputSemanticProperties();
+               sp.addReadFields(1, new FieldSet(0, 1));
+
+               assertEquals(2, sp.getReadFields(1).size());
+               assertTrue(sp.getReadFields(1).contains(0));
+               assertTrue(sp.getReadFields(1).contains(1));
+
+               sp.addReadFields(1, new FieldSet(3));
+
+               assertEquals(3, sp.getReadFields(1).size());
+               assertTrue(sp.getReadFields(1).contains(0));
+               assertTrue(sp.getReadFields(1).contains(1));
+               assertTrue(sp.getReadFields(1).contains(3));
+       }
+
+       @Test(expected = 
SemanticProperties.InvalidSemanticAnnotationException.class)
+       public void testAddForwardedFieldsTargetTwice1() {
+
+               DualInputSemanticProperties sp = new 
DualInputSemanticProperties();
+               sp.addForwardedField(0, 0, 2);
+               sp.addForwardedField(0, 1, 2);
+       }
+
+       @Test(expected = 
SemanticProperties.InvalidSemanticAnnotationException.class)
+       public void testAddForwardedFieldsTargetTwice2() {
+
+               DualInputSemanticProperties sp = new 
DualInputSemanticProperties();
+               sp.addForwardedField(1, 0, 2);
+               sp.addForwardedField(1, 1, 2);
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java
new file mode 100644
index 0000000..5348653
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import static org.junit.Assert.*;
+
+import 
org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.junit.Test;
+
+public class SingleInputSemanticPropertiesTest {
+
+       @Test
+       public void testGetTargetFields() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               sp.addForwardedField(0,1);
+               sp.addForwardedField(1,4);
+               sp.addForwardedField(2,3);
+               sp.addForwardedField(3,2);
+
+               assertEquals(1, sp.getForwardingTargetFields(0, 0).size());
+               assertEquals(1, sp.getForwardingTargetFields(0, 1).size());
+               assertEquals(1, sp.getForwardingTargetFields(0, 2).size());
+               assertEquals(1, sp.getForwardingTargetFields(0, 3).size());
+               assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
+               assertTrue(sp.getForwardingTargetFields(0, 1).contains(4));
+               assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
+               assertTrue(sp.getForwardingTargetFields(0, 3).contains(2));
+               assertNotNull(sp.getForwardingTargetFields(0, 4));
+               assertEquals(0, sp.getForwardingTargetFields(0, 4).size());
+
+               sp = new SingleInputSemanticProperties();
+               sp.addForwardedField(0,0);
+               sp.addForwardedField(0,4);
+               sp.addForwardedField(1,1);
+               sp.addForwardedField(1,2);
+               sp.addForwardedField(1,3);
+
+               assertEquals(2, sp.getForwardingTargetFields(0, 0).size());
+               assertEquals(3, sp.getForwardingTargetFields(0, 1).size());
+               assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+               assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
+               assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+               assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
+               assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
+               assertNotNull(sp.getForwardingTargetFields(0, 2));
+               assertEquals(0, sp.getForwardingTargetFields(0, 2).size());
+
+       }
+
+       @Test
+       public void testGetSourceField() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               sp.addForwardedField(0,1);
+               sp.addForwardedField(1,4);
+               sp.addForwardedField(2,3);
+               sp.addForwardedField(3,2);
+
+               assertEquals(0, sp.getForwardingSourceField(0, 1));
+               assertEquals(1, sp.getForwardingSourceField(0, 4));
+               assertEquals(2, sp.getForwardingSourceField(0, 3));
+               assertEquals(3, sp.getForwardingSourceField(0, 2));
+               assertTrue(sp.getForwardingSourceField(0, 0) < 0);
+               assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+
+               sp = new SingleInputSemanticProperties();
+               sp.addForwardedField(0,0);
+               sp.addForwardedField(0,4);
+               sp.addForwardedField(1,1);
+               sp.addForwardedField(1,2);
+               sp.addForwardedField(1,3);
+
+               assertEquals(0, sp.getForwardingSourceField(0, 0));
+               assertEquals(0, sp.getForwardingSourceField(0, 4));
+               assertEquals(1, sp.getForwardingSourceField(0, 1));
+               assertEquals(1, sp.getForwardingSourceField(0, 2));
+               assertEquals(1, sp.getForwardingSourceField(0, 3));
+               assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+
+       }
+
+       @Test
+       public void testGetReadSet() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               sp.addReadFields(new FieldSet(0, 1));
+
+               assertEquals(2, sp.getReadFields(0).size());
+               assertTrue(sp.getReadFields(0).contains(0));
+               assertTrue(sp.getReadFields(0).contains(1));
+
+               sp.addReadFields(new FieldSet(3));
+
+               assertEquals(3, sp.getReadFields(0).size());
+               assertTrue(sp.getReadFields(0).contains(0));
+               assertTrue(sp.getReadFields(0).contains(1));
+               assertTrue(sp.getReadFields(0).contains(3));
+       }
+
+       @Test(expected = InvalidSemanticAnnotationException.class)
+       public void testAddForwardedFieldsTargetTwice() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               sp.addForwardedField(0, 2);
+               sp.addForwardedField(1, 2);
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testGetTargetFieldInvalidIndex() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               sp.addForwardedField(0,0);
+
+               sp.getForwardingTargetFields(1, 0);
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testGetSourceFieldInvalidIndex() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               sp.addForwardedField(0,0);
+
+               sp.getForwardingSourceField(1, 0);
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testGetReadFieldsInvalidIndex() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               sp.addReadFields(new FieldSet(0, 1));
+
+               sp.getReadFields(1);
+       }
+
+       @Test
+       public void testAllForwardedSingleInputSemProps() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
+
+               assertEquals(1, sp.getForwardingTargetFields(0, 0).size());
+               assertEquals(1, sp.getForwardingTargetFields(0, 1).size());
+               assertEquals(1, sp.getForwardingTargetFields(0, 123).size());
+               assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
+               assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
+               assertTrue(sp.getForwardingTargetFields(0, 123).contains(123));
+
+               assertEquals(0, sp.getForwardingSourceField(0, 0));
+               assertEquals(2, sp.getForwardingSourceField(0, 2));
+               assertEquals(123, sp.getForwardingSourceField(0, 123));
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testAllForwardedSingleInputSemPropsInvalidIndex1() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
+               sp.getForwardingSourceField(1, 0);
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testAllForwardedSingleInputSemPropsInvalidIndex2() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
+               sp.getForwardingTargetFields(1, 0);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
index 73733fd..7383457 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -198,6 +199,7 @@ public class KMeans {
        // 
*************************************************************************
        
        /** Converts a Tuple2<Double,Double> into a Point. */
+       @ForwardedFields("0->x; 1->y")
        public static final class TuplePointConverter implements 
MapFunction<Tuple2<Double, Double>, Point> {
 
                @Override
@@ -207,6 +209,7 @@ public class KMeans {
        }
        
        /** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
+       @ForwardedFields("0->id; 1->x; 2->y")
        public static final class TupleCentroidConverter implements 
MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
 
                @Override
@@ -216,6 +219,7 @@ public class KMeans {
        }
        
        /** Determines the closest cluster center for a data point. */
+       @ForwardedFields("*->1")
        public static final class SelectNearestCenter extends 
RichMapFunction<Point, Tuple2<Integer, Point>> {
                private Collection<Centroid> centroids;
 
@@ -248,7 +252,8 @@ public class KMeans {
                }
        }
        
-       /** Appends a count variable to the tuple. */ 
+       /** Appends a count variable to the tuple. */
+       @ForwardedFields("f0;f1")
        public static final class CountAppender implements 
MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
 
                @Override
@@ -258,6 +263,7 @@ public class KMeans {
        }
        
        /** Sums and counts point coordinates. */
+       @ForwardedFields("0")
        public static final class CentroidAccumulator implements 
ReduceFunction<Tuple3<Integer, Point, Long>> {
 
                @Override
@@ -267,6 +273,7 @@ public class KMeans {
        }
        
        /** Computes new centroid from coordinate sum and count of points. */
+       @ForwardedFields("0->id")
        public static final class CentroidAverager implements 
MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
index cb3b8d5..bd74b20 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
@@ -126,7 +126,7 @@ public class ConnectedComponents implements 
ProgramDescription {
        /**
         * Function that turns a value into a 2-tuple where both fields are 
that value.
         */
-       @ConstantFields("0 -> 0,1") 
+       @ForwardedFields("*->f0")
        public static final class DuplicateValue<T> implements MapFunction<T, 
Tuple2<T, T>> {
                
                @Override
@@ -155,8 +155,8 @@ public class ConnectedComponents implements 
ProgramDescription {
         * a vertex is associated with, with a (Source-Vertex-ID, 
Target-VertexID) edge. The function
         * produces a (Target-vertex-ID, Component-ID) pair.
         */
-       @ConstantFieldsFirst("1 -> 1")
-       @ConstantFieldsSecond("1 -> 0")
+       @ForwardedFieldsFirst("f1->f1")
+       @ForwardedFieldsSecond("f1->f0")
        public static final class NeighborWithComponentIDJoin implements 
JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                @Override
@@ -167,7 +167,7 @@ public class ConnectedComponents implements 
ProgramDescription {
        
 
 
-       @ConstantFieldsFirst("0")
+       @ForwardedFieldsFirst("*")
        public static final class ComponentIdFilter implements 
FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                @Override
@@ -178,6 +178,8 @@ public class ConnectedComponents implements 
ProgramDescription {
                }
        }
 
+
+
        @Override
        public String getDescription() {
                return "Parameters: <vertices-path> <edges-path> <result-path> 
<max-number-of-iterations>";

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
index 2b65ad6..5af60be 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
@@ -119,6 +120,7 @@ public class EnumTrianglesBasic {
        // 
*************************************************************************
 
        /** Converts a Tuple2 into an Edge */
+       @ForwardedFields("0;1")
        public static class TupleEdgeConverter implements 
MapFunction<Tuple2<Integer, Integer>, Edge> {
                private final Edge outEdge = new Edge();
                
@@ -149,6 +151,7 @@ public class EnumTrianglesBasic {
         *  The first vertex of a triad is the shared vertex, the second and 
third vertex are ordered by vertexId. 
         *  Assumes that input edges share the first vertex and are in 
ascending order of the second vertex.
         */
+       @ForwardedFields("0")
        private static class TriadBuilder implements GroupReduceFunction<Edge, 
Triad> {
                private final List<Integer> vertices = new ArrayList<Integer>();
                private final Triad outTriad = new Triad();

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
index 3f05a2b..fb1e6f5 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
@@ -134,6 +135,7 @@ public class EnumTrianglesOpt {
        // 
*************************************************************************
        
        /** Converts a Tuple2 into an Edge */
+       @ForwardedFields("0;1")
        public static class TupleEdgeConverter implements 
MapFunction<Tuple2<Integer, Integer>, Edge> {
                private final Edge outEdge = new Edge();
                
@@ -209,6 +211,7 @@ public class EnumTrianglesOpt {
         * Builds an edge with degree annotation from two edges that have the 
same vertices and only one 
         * degree annotation.
         */
+       @ForwardedFields("0;1")
        private static class DegreeJoiner implements 
ReduceFunction<EdgeWithDegrees> {
                private final EdgeWithDegrees outEdge = new EdgeWithDegrees();
                
@@ -269,6 +272,7 @@ public class EnumTrianglesOpt {
         *  The first vertex of a triad is the shared vertex, the second and 
third vertex are ordered by vertexId. 
         *  Assumes that input edges share the first vertex and are in 
ascending order of the second vertex.
         */
+       @ForwardedFields("0")
        private static class TriadBuilder implements GroupReduceFunction<Edge, 
Triad> {
                
                private final List<Integer> vertices = new ArrayList<Integer>();

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
index 4e4d9ad..d622799 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
@@ -156,7 +156,7 @@ public class PageRankBasic {
         * A reduce function that takes a sequence of edges and builds the 
adjacency list for the vertex where the edges
         * originate. Run as a pre-processing step.
         */
-       @ConstantFields("0")
+       @ForwardedFields("0")
        public static final class BuildOutgoingEdgeList implements 
GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
                
                private final ArrayList<Long> neighbors = new ArrayList<Long>();
@@ -194,7 +194,7 @@ public class PageRankBasic {
        /**
         * The function that applies the page rank dampening formula
         */
-       @ConstantFields("0")
+       @ForwardedFields("0")
        public static final class Dampener implements 
MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
 
                private final double dampening;

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
index abe96c8..d0fefeb 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
@@ -64,7 +64,7 @@ public class TransitiveClosureNaive implements 
ProgramDescription {
                                        public Tuple2<Long, Long> 
join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
                                                return new Tuple2<Long, 
Long>(left.f0, right.f1);
                                        }
-                               })
+                               
}).withForwardedFieldsFirst("0").withForwardedFieldsSecond("1")
                                .union(paths)
                                .groupBy(0, 1)
                                .reduceGroup(new 
GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@@ -72,7 +72,7 @@ public class TransitiveClosureNaive implements 
ProgramDescription {
                                        public void 
reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) 
throws Exception {
                                                
out.collect(values.iterator().next());
                                        }
-                               });
+                               }).withForwardedFields("0;1");
 
                DataSet<Tuple2<Long,Long>> newPaths = paths
                                .coGroup(nextPaths)
@@ -90,7 +90,7 @@ public class TransitiveClosureNaive implements 
ProgramDescription {
                                                        }
                                                }
                                        }
-                               });
+                               
}).withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
 
                DataSet<Tuple2<Long, Long>> transitiveClosure = 
paths.closeWith(nextPaths, newPaths);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
index 9694647..86b7de2 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.examples.java.ml.util.LinearRegressionData;
@@ -184,6 +185,7 @@ public class LinearRegression {
        // 
*************************************************************************
 
        /** Converts a Tuple2<Double,Double> into a Data. */
+       @ForwardedFields("0->x; 1->y")
        public static final class TupleDataConverter implements 
MapFunction<Tuple2<Double, Double>, Data> {
 
                @Override
@@ -193,6 +195,7 @@ public class LinearRegression {
        }
 
        /** Converts a Tuple2<Double,Double> into a Params. */
+       @ForwardedFields("0->theta0; 1->theta1")
        public static final class TupleParamsConverter implements 
MapFunction<Tuple2<Double, Double>,Params> {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
index 44f134a..890af65 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
@@ -21,6 +21,7 @@ package org.apache.flink.examples.java.relational;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -234,6 +235,7 @@ public class WebLogAnalysis {
         * If the first input does not provide any pairs, all pairs of the 
second input are emitted.
         * Otherwise, no pair is emitted.
         */
+       @ForwardedFieldsFirst("*")
        public static class AntiJoinVisits implements 
CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, 
Tuple3<Integer, String, Integer>> {
 
                /**

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
index 269ba7f..11430e9 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.examples.scala.clustering
 
 import org.apache.flink.api.common.functions._
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.examples.java.clustering.util.KMeansData
@@ -83,10 +84,10 @@ object KMeans {
     val finalCentroids = centroids.iterate(numIterations) { currentCentroids =>
       val newCentroids = points
         .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, 
"centroids")
-        .map { x => (x._1, x._2, 1L) }
+        .map { x => (x._1, x._2, 1L) }.withForwardedFields("_1; _2")
         .groupBy(0)
-        .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }
-        .map { x => new Centroid(x._1, x._2.div(x._3)) }
+        .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) 
}.withForwardedFields("_1")
+        .map { x => new Centroid(x._1, x._2.div(x._3)) 
}.withForwardedFields("_1->id")
       newCentroids
     }
 
@@ -222,6 +223,7 @@ object KMeans {
   }
 
   /** Determines the closest cluster center for a data point. */
+  @ForwardedFields(Array("*->_2"))
   final class SelectNearestCenter extends RichMapFunction[Point, (Int, Point)] 
{
     private var centroids: Traversable[Centroid] = null
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
index dfebfe9..2bb6916 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
@@ -69,7 +69,7 @@ object ConnectedComponents {
 
     // read vertex and edge data
     // assign the initial components (equal to the vertex id)
-    val vertices = getVerticesDataSet(env).map { id => (id, id) }
+    val vertices = getVerticesDataSet(env).map { id => (id, id) 
}.withForwardedFields("*->_1;*->_2")
 
     // undirected edges by emitting for each input edge the input edges itself 
and an inverted
     // version
@@ -82,7 +82,7 @@ object ConnectedComponents {
         // apply the step logic: join with the edges
         val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) 
=>
           (edge._2, vertex._2)
-        }
+        
}.withForwardedFieldsFirst("_2->_2").withForwardedFieldsSecond("_2->_1")
 
         // select the minimum neighbor
         val minNeighbors = allNeighbors.groupBy(0).min(1)
@@ -91,7 +91,7 @@ object ConnectedComponents {
         val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
           (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
             if (newVertex._2 < oldVertex._2) out.collect(newVertex)
-        }
+        }.withForwardedFieldsFirst("*")
 
         // delta and new workset are identical
         (updatedComponents, updatedComponents)

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
index d111890..b4955ed 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
@@ -70,6 +70,7 @@ object DeltaPageRank {
       .groupBy(0).sum(1)
 
     val initialDeltas = initialRanks.map { (page) => (page._1, page._2 - 
INITIAL_RANK) }
+                                      .withForwardedFields("_1")
 
     val iteration = initialRanks.iterateDelta(initialDeltas, maxIterations, 
Array(0)) {
 
@@ -91,7 +92,7 @@ object DeltaPageRank {
 
           val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) {
             (current, delta) => (current._1, current._2 + delta._2)
-          }
+          }.withForwardedFieldsFirst("_1")
 
           (rankUpdates, deltas)
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
index 6ebacee..2623c0c 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.examples.scala.graph
 
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
 import org.apache.flink.api.scala._
 import scala.collection.JavaConverters._
 import org.apache.flink.api.scala.ExecutionEnvironment
@@ -85,6 +86,7 @@ object EnumTrianglesBasic {
             .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new 
TriadBuilder())
             // filter triads
             .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => 
t }
+              .withForwardedFieldsFirst("*")
     
     // emit result
     if (fileOutput) {
@@ -114,6 +116,7 @@ object EnumTrianglesBasic {
    *  of a triad is the shared vertex, the second and third vertex are ordered 
by vertexId. Assumes
    *  that input edges share the first vertex and are in ascending order of 
the second vertex.
    */
+  @ForwardedFields(Array("v1->v1"))
   class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
 
     val vertices = mutable.MutableList[Integer]()

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
index bde1bf3..64aeb77 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.examples.scala.graph
 
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
 import org.apache.flink.api.scala._
 import scala.collection.JavaConverters._
 import org.apache.flink.api.scala.ExecutionEnvironment
@@ -97,7 +98,7 @@ object EnumTrianglesOpt {
           } else {
             new EdgeWithDegrees(e1.v1, e2.d1, e1.v2, e1.d2)
           }
-      }
+      }.withForwardedFields("v1;v2")
 
     // project edges by degrees, vertex with smaller degree comes first
     val edgesByDegree = edgesWithDegrees
@@ -111,6 +112,7 @@ object EnumTrianglesOpt {
       .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new 
TriadBuilder())
       // filter triads
       .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t}
+        .withForwardedFieldsFirst("*")
 
     // emit result
     if (fileOutput) {
@@ -182,6 +184,7 @@ object EnumTrianglesOpt {
    * Assumes that input edges share the first vertex and are in ascending 
order of the second
    * vertex.
    */
+  @ForwardedFields(Array("v1"))
   class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
 
     val vertices = mutable.MutableList[Integer]()

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index 1d70ae9..dc42ed1 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -84,7 +84,7 @@ object PageRankBasic {
     val links = getLinksDataSet(env)
 
     // assign initial ranks to pages
-    val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
+    val pagesWithRanks = pages.map(p => Page(p, 1.0 / 
numPages)).withForwardedFields("*->pageId")
 
     // build adjacency list from link input
     val adjacencyLists = links
@@ -112,7 +112,7 @@ object PageRankBasic {
           // apply dampening factor
           .map { p =>
             Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - 
DAMPENING_FACTOR) / numPages))
-          }
+          }.withForwardedFields("pageId")
 
         // terminate if no rank update was significant
         val termination = 
currentRanks.join(newRanks).where("pageId").equalTo("pageId") {

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index b281b98..d171611 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -38,10 +38,10 @@ object TransitiveClosureNaive {
         .join(edges)
         .where(1).equalTo(0) {
           (left, right) => (left._1,right._2)
-        }
+        }.withForwardedFieldsFirst("_1").withForwardedFieldsSecond("_2")
         .union(prevPaths)
         .groupBy(0, 1)
-        .reduce((l, r) => l)
+        .reduce((l, r) => l).withForwardedFields("_1; _2")
 
       val terminate = prevPaths
         .coGroup(nextPaths)
@@ -51,7 +51,7 @@ object TransitiveClosureNaive {
             for (n <- next)
               if (!prevPaths.contains(n)) out.collect(n)
           }
-      }
+      }.withForwardedFieldsSecond("*")
       (nextPaths, terminate)
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
index 38b6415..950e3c8 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
@@ -110,12 +110,12 @@ object WebLogAnalysis {
 
     val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) {
       (doc, rank) => rank
-    }
+    }.withForwardedFieldsSecond("*")
 
     val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) {
       (ranks, visits, out: Collector[(Int, String, Int)]) =>
         if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
-    }
+    }.withForwardedFieldsFirst("*")
 
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index 25875a0..09678fd 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -29,251 +29,280 @@ import java.util.Set;
 import org.apache.flink.api.common.InvalidProgramException;
 
 /**
- * This class defines the semantic assertions that can be added to functions.
- * The assertions are realized as java annotations, to be added to the class 
declaration of
- * the class that implements the functions. For example, to declare the 
<i>ConstantFields</i>
- * annotation for a map-type function that simply copies some fields, use it 
the following way:
+ * This class defines Java annotations for semantic assertions that can be 
added to Flink functions.
+ * Semantic annotations can help the Flink optimizer to generate more 
efficient execution plans for Flink programs.
+ * For example, a <i>ForwardedFields</i> assertion for a map-type function can 
be declared as:
  *
  * <pre><blockquote>
- * \@ConstantFields({"0->0,1", "1->2"})
- * public class MyMapper extends FlatMapFunction<Tuple3<String, Integer, 
Integer>, Tuple3<String, String, Integer>>
+ * \@ForwardedFields({"f0; f1->f2"})
+ * public class MyMapper extends MapFunction<Tuple3<String, String, Integer>, 
Tuple3<String, Integer, Integer>>
  * {
- *     public void flatMap(Tuple3<String, Integer, Integer> value, 
Collector<Tuple3<String, String, Integer>> out) {
- *         value.f2 = value.f1
- *         value.f1 = value.f0;
- *         out.collect(value);
+ *     public Tuple3<String, Integer, Integer> map(Tuple3<String, String, 
Integer> val) {
+ *
+ *         return new Tuple3<String, Integer, Integer>(val.f0, val.f2, 1);
  *     }
  * }
  * </blockquote></pre>
+ *
  * <p>
- * All annotations takes String arrays. The Strings represent the source and 
destination fields.
- * The transition is represented by the arrow "->".
- * Fields are described by their tuple position (and later also the names of 
the fields in the objects).
- * The left hand side of the arrow always describes the fields in the input 
value(s), i.e. the value that 
- * is passed as a parameter to the function call, or the values obtained from 
the input iterator. The right
- * hand side of the arrow describes the field in the value returned from the 
function. If the right hand side
- * is omitted, the a field is assumed to stay exactly the same, i.e. the field 
itself is unmodified, rather 
- * than that the value is placed into another field.
+ * All annotations take Strings with expressions that refer to (nested) value 
fields of the input and output types of a function.
+ * Field expressions for of composite data types (tuples, POJOs, Scala case 
classes) can be expressed in
+ * different ways, depending on the data type they refer to.
+ *</p>
+ *
+ * <ul>
+ *     <li>Java tuple data types (such as {@link 
org.apache.flink.api.java.tuple.Tuple3}): A tuple field can be addressed using
+ * its 0-offset index or name, e.g., the second field of a Java tuple is 
addressed by <code>"1"</code> or <code>"f1"</code>.</li>
+ *     <li>Java POJO data types: A POJO field is addressed using its names, 
e.g., <code>"xValue"</code> for the member field
+ * <code>xValue</code> of a POJO type that describes a 2d-coordinate.</li>
+ *     <li>Scala tuple data types (such as {@link scala.Tuple3}): A tuple 
field can be addressed using its 1-offset name
+ * (following Scala conventions) or 0-offset index, e.g., the second field of 
a Scala tuple is addressed by
+ * <code>"_2"</code> or <code>1</code></li>
+ *     <li>Scala case classes: A case class field is addressed using its 
names, e.g., <code>"xValue"</code> for the field <code>xValue</code>
+ * of a case class that describes a 2d-coordinate.</li>
+ * </ul>
+ *
  * <p>
- * <b>
- * It is very important to follow a conservative strategy when specifying 
constant fields.
- * Only fields that are always constant (regardless of value, stub call, etc.) 
to the output may be
- * declared as such! Otherwise, the correct execution of a program can not be 
guaranteed. So if in doubt,
- * do not add a field to this set.
+ * Nested fields are addressed by navigation, e.g., <code>"f1.xValue"</code> 
addresses the field <code>xValue</code> of a POJO type,
+ * that is stored at the second field of a Java tuple. In order to refer to 
all fields of a composite type (or the composite type itself)
+ * such as a tuple, POJO, or case class type, a <code>"*"</code> wildcard can 
be used, e.g., <code>f2.*</code> or <code>f2</code> reference all fields
+ * of a composite type at the third position of a Java tuple.
+ * </p>
+ *
+ * <b>NOTE: The use of semantic annotation is optional!
+ * If used correctly, semantic annotations can help the Flink optimizer to 
generate more efficient execution plans.
+ * However, incorrect semantic annotations can cause the optimizer to generate 
incorrect execution plans which compute wrong results!
+ * So be careful when adding semantic annotations.
  * </b>
- * <p>
- * Be aware that some annotations should only be used for functions with as 
single input
- * ({@link org.apache.flink.api.common.functions.RichMapFunction}, {@link 
org.apache.flink.api.common.functions.RichReduceFunction}) and some only for 
stubs with two inputs
- * ({@link org.apache.flink.api.common.functions.RichCrossFunction}, {@link 
org.apache.flink.api.common.functions.RichFlatJoinFunction}, {@link 
org.apache.flink.api.common.functions.RichCoGroupFunction}).
+ *
  */
 public class FunctionAnnotation {
 
        /**
-        * This annotation declares that a function leaves certain fields of 
its input values unmodified and
-        * only "forwards" or "copies" them to the return value. The annotation 
is applicable to unary
-        * functions, like for example {@link 
org.apache.flink.api.common.functions.RichMapFunction}, {@link 
org.apache.flink.api.common.functions.RichReduceFunction}, or {@link 
org.apache.flink.api.common.functions.RichFlatMapFunction}.
-        * <p>
-        * The following example illustrates a function that keeps the tuple's 
field zero constant:
-        * <pre><blockquote>
-        * \@ConstantFields("0")
-        * public class MyMapper extends MapFunction<Tuple3<String, Integer, 
Integer>, Tuple2<String, Double>>
-        * {
-        *     public Tuple2<String, Double> map(Tuple3<String, Integer, 
Integer> value) {
-        *         return new Tuple2<String, Double>(value.f0, value.f1 * 0.5);
-        *     }
-        * }
-        * </blockquote></pre>
-        * <p>
-        * (Note that you could equivalently write {@code @ConstantFields("0 -> 
0")}.
-        * <p>
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsExcept} annotation.
-        * <p>
-        * If neither this annotation, nor the {@link ConstantFieldsExcept} 
annotation are set, it is
-        * assumed that <i>no</i> field in the input is forwarded/copied 
unmodified.
-        * 
-        * @see ConstantFieldsExcept
+        * The ForwardedFields annotation declares fields which are never 
modified by the annotated function and
+        * which are forwarded at the same position to the output or unchanged 
copied to another position in the output.
+        *
+        * Fields that are forwarded at the same position can be specified by 
their position.
+        * The specified position must be valid for the input and output data 
type and have the same type.
+        * For example <code>\@ForwardedFields({"f2"})</code> declares that the 
third field of a Java input tuple is
+        * copied to the third field of an output tuple.
+        *
+        * Fields which are unchanged copied to another position in the output 
are declared by specifying the
+        * source field expression in the input and the target field expression 
in the output.
+        * <code>\@ForwardedFields({"f0->f2"})</code> denotes that the first 
field of the Java input tuple is
+        * unchanged copied to the third field of the Java output tuple. When 
using the wildcard ("*") ensure that
+        * the number of declared fields and their types in input and output 
type match.
+        *
+        * Multiple forwarded fields can be annotated in one 
(<code>\@ForwardedFields({"f2; f3->f0; f4"})</code>)
+        * or separate Strings (<code>\@ForwardedFields({"f2", "f3->f0", 
"f4"})</code>).
+        *
+        * <b>NOTE: The use of the ForwardedFields annotation is optional.
+        * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
+        * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
+        * It is NOT required that all forwarded fields are declared, but all 
declarations must be correct.
+        * </b>
+        *
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field expressions such as nested fields and wildcard.
+        *
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFields {
+       public @interface ForwardedFields {
                String[] value();
        }
 
        /**
-        * This annotation declares that a function leaves certain fields of 
its first input values unmodified and
-        * only "forwards" or "copies" them to the return value. The annotation 
is applicable to binary
-        * functions, like for example {@link 
org.apache.flink.api.common.functions.RichFlatJoinFunction}, {@link 
org.apache.flink.api.common.functions.RichCoGroupFunction}, or {@link 
org.apache.flink.api.common.functions.RichCrossFunction}.
-        * <p>
-        * The following example illustrates a join function that copies fields 
from the first and second input to the
-        * return value:
-        * <pre><blockquote>
-        * \@ConstantFieldsFirst("1 -> 0")
-        * \@ConstantFieldsFirst("1 -> 1")
-        * public class MyJoin extends JoinFunction<Tuple2<String, Integer>, 
Tuple2<String, String>, Tuple2<Integer, String>>
-        * {
-        *     public Tuple2<Integer, String> map(Tuple2<String, Integer> 
first, Tuple2<String, String> second) {
-        *         return new Tuple2<String, Double>(first.f1, second.f1);
-        *     }
-        * }
-        * </blockquote></pre>
-        * <p>
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsFirstExcept} annotation.
-        * <p>
-        * If neither this annotation, nor the {@link 
ConstantFieldsFirstExcept} annotation are set, it is
-        * assumed that <i>no</i> field in the first input is forwarded/copied 
unmodified.
-        * 
-        * @see ConstantFieldsSecond
-        * @see ConstantFields
+        * The ForwardedFieldsFirst annotation declares fields of the first 
input of a function which are
+        * never modified by the annotated function and which are forwarded at 
the same position to the
+        * output or unchanged copied to another position in the output.
+        *
+        * Fields that are forwarded from the first input at the same position 
in the output can be
+        * specified by their position. The specified position must be valid 
for the input and output data type and have the same type.
+        * For example <code>\@ForwardedFieldsFirst({"f2"})</code> declares 
that the third field of a Java input tuple at the first input is
+        * copied to the third field of an output tuple.
+        *
+        * Fields which are unchanged copied to another position in the output 
are declared by specifying the
+        * source field expression in the input and the target field expression 
in the output.
+        * <code>\@ForwardedFieldsFirst({"f0->f2"})</code> denotes that the 
first field of the Java input tuple at the first input is
+        * unchanged copied to the third field of the Java output tuple. When 
using the wildcard ("*") ensure that
+        * the number of declared fields and their types in input and output 
type match.
+        *
+        * Multiple forwarded fields can be annotated in one 
(<code>\@ForwardedFieldsFirst({"f2; f3->f0; f4"})</code>)
+        * or separate Strings (<code>\@ForwardedFieldsFirst({"f2", "f3->f0", 
"f4"})</code>).
+        *
+        * <b>NOTE: The use of the ForwardedFieldsFirst annotation is optional.
+        * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
+        * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
+        * It is NOT required that all forwarded fields are declared, but all 
declarations must be correct.
+        * </b>
+        *
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field expressions such as nested fields and wildcard.
+        *
+        * Forwarded fields from the second input can be specified using the
+        * {@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} 
annotation.
+        *
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFieldsFirst {
+       public @interface ForwardedFieldsFirst {
                String[] value();
        }
 
        /**
-        * This annotation declares that a function leaves certain fields of 
its second input values unmodified and
-        * only "forwards" or "copies" them to the return value. The annotation 
is applicable to binary
-        * functions, like for example {@link 
org.apache.flink.api.common.functions.RichFlatJoinFunction}, {@link 
org.apache.flink.api.common.functions.RichCoGroupFunction}, or {@link 
org.apache.flink.api.common.functions.RichCrossFunction}.
-        * <p>
-        * The following example illustrates a join function that copies fields 
from the first and second input to the
-        * return value:
-        * <pre><blockquote>
-        * \@ConstantFieldsFirst("1 -> 0")
-        * \@ConstantFieldsFirst("1 -> 1")
-        * public class MyJoin extends JoinFunction<Tuple2<String, Integer>, 
Tuple2<String, String>, Tuple2<Integer, String>>
-        * {
-        *     public Tuple2<Integer, String> map(Tuple2<String, Integer> 
first, Tuple2<String, String> second) {
-        *         return new Tuple2<String, Double>(first.f1, second.f1);
-        *     }
-        * }
-        * </blockquote></pre>
-        * <p>
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsSecond} annotation.
-        * <p>
-        * If neither this annotation, nor the {@link 
ConstantFieldsSecondExcept} annotation are set, it is
-        * assumed that <i>no</i> field in the second input is forwarded/copied 
unmodified.
-        * 
-        * @see ConstantFieldsFirst
-        * @see ConstantFields
+        * The ForwardedFieldsSecond annotation declares fields of the second 
input of a function which are
+        * never modified by the annotated function and which are forwarded at 
the same position to the
+        * output or unchanged copied to another position in the output.
+        *
+        * Fields that are forwarded from the second input at the same position 
in the output can be
+        * specified by their position. The specified position must be valid 
for the input and output data type and have the same type.
+        * For example <code>\@ForwardedFieldsSecond({"f2"})</code> declares 
that the third field of a Java input tuple at the second input is
+        * copied to the third field of an output tuple.
+        *
+        * Fields which are unchanged copied to another position in the output 
are declared by specifying the
+        * source field expression in the input and the target field expression 
in the output.
+        * <code>\@ForwardedFieldsSecond({"f0->f2"})</code> denotes that the 
first field of the Java input tuple at the second input is
+        * unchanged copied to the third field of the Java output tuple. When 
using the wildcard ("*") ensure that
+        * the number of declared fields and their types in input and output 
type match.
+        *
+        * Multiple forwarded fields can be annotated in one 
(<code>\@ForwardedFieldsSecond({"f2; f3->f0; f4"})</code>)
+        * or separate Strings (<code>\@ForwardedFieldsSecond({"f2", "f3->f0", 
"f4"})</code>).
+        *
+        * <b>NOTE: The use of the ForwardedFieldsSecond annotation is optional.
+        * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
+        * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
+        * It is NOT required that all forwarded fields are declared, but all 
declarations must be correct.
+        * </b>
+        *
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field expressions such as nested fields and wildcard.
+        *
+        * Forwarded fields from the first input can be specified using the
+        * {@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} 
annotation.
+        *
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFieldsSecond {
+       public @interface ForwardedFieldsSecond {
                String[] value();
        }
 
        /**
-        * This annotation declares that a function changes certain fields of 
its input values, while leaving all
-        * others unmodified and in place in the return value. The annotation 
is applicable to unary
-        * functions, like for example {@link 
org.apache.flink.api.common.functions.RichMapFunction}, {@link 
org.apache.flink.api.common.functions.RichReduceFunction}, or {@link 
org.apache.flink.api.common.functions.RichFlatMapFunction}.
-        * <p>
-        * The following example illustrates that at the example of a Map 
function:
-        * 
-        * <pre><blockquote>
-        * \@ConstantFieldsExcept("1")
-        * public class MyMapper extends MapFunction<Tuple3<String, Integer, 
Double>, Tuple3<String, Double, Double>>
-        * {
-        *     public Tuple3<String, String, Double> map(Tuple3<String, 
Integer, Double> value) {
-        *         return new Tuple3<String, String, Double>(value.f0, value.f2 
/ 2, value.f2);
-        *     }
-        * }
-        * </blockquote></pre>
-        * <p>
-        * The annotation takes one String array specifying the positions of 
the input types that do not remain constant.
-        * When this annotation is used, it is assumed that all other values 
remain at the same position in input and output.
-        * To model more complex situations use the {@link ConstantFields}s 
annotation.
-        * <p>
-        * This annotation is mutually exclusive with the {@link 
ConstantFields} annotation.
-        * <p>
-        * If neither this annotation, nor the {@link ConstantFields} 
annotation are set, it is
-        * assumed that <i>no</i> field in the input is forwarded/copied 
unmodified.
-        * 
-        * @see ConstantFieldsExcept
+        * The NonForwardedFields annotation declares ALL fields which not 
preserved on the same position in a functions output.
+        * ALL other fields are considered to be unmodified at the same 
position.
+        * Hence, the NonForwardedFields annotation is inverse to the {@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields} 
annotation.
+        *
+        * <b>NOTE: The use of the NonForwardedFields annotation is optional.
+        * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
+        * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
+        * Since all not declared fields are considered to be forwarded, it is 
required that ALL non-forwarded fields are declared.
+        * </b>
+        *
+        * Non-forwarded fields are declared as a list of field expressions, 
e.g., <code>\@NonForwardedFields({"f1; f3"})</code>
+        * declares that the second and fourth field of a Java tuple are 
modified and all other fields are are not changed and remain
+        * on their position. A NonForwardedFields annotation can only be used 
on functions where the type of the input and output are identical.
+        *
+        * Multiple non-forwarded fields can be annotated in one 
(<code>\@NonForwardedFields({"f1; f3"})</code>)
+        * or separate Strings (<code>\@NonForwardedFields({"f1", 
"f3"})</code>).
+        *
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field expressions such as nested fields and wildcard.
+        *
+        * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFieldsExcept {
+       public @interface NonForwardedFields {
                String[] value();
        }
 
        /**
-        * This annotation declares that a function changes certain fields of 
its first input value, while leaving all
-        * others unmodified and in place in the return value. The annotation 
is applicable to binary
-        * functions, like for example {@link 
org.apache.flink.api.common.functions.RichFlatJoinFunction}, {@link 
org.apache.flink.api.common.functions.RichCoGroupFunction}, or {@link 
org.apache.flink.api.common.functions.RichCrossFunction}.
-        * <p>
-        * The following example illustrates a join function that copies fields 
from the first and second input to the
-        * return value:
-        * 
-        * <pre><blockquote>
-        * \@ConstantFieldsFirstExcept("1")
-        * public class MyJoin extends JoinFunction<Tuple3<String, Integer, 
Double>, Tuple2<String, Double>, Tuple3<String, Double, Double>>
-        * {
-        *     public Tuple3<String, Double, Double> map(Tuple3<String, 
Integer, Double> first, Tuple2<String, Double> second) {
-        *         return Tuple3<String, Double, Double>(first.f0, second.f1, 
first.f2);
-        *     }
-        * }
-        * </blockquote></pre>
-        * <p>
-        * The annotation takes one String array specifying the positions of 
the input types that do not remain constant.
-        * When this annotation is used, it is assumed that all other values 
remain at the same position in input and output.
-        * To model more complex situations use the {@link ConstantFields}s 
annotation.
-        * <p>
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsFirst}
-        * <p>
-        * If neither this annotation, nor the {@link ConstantFieldsFirst} 
annotation are set, it is
-        * assumed that <i>no</i> field in the first input is forwarded/copied 
unmodified.
-        * 
-        * @see ConstantFieldsFirst
-        * @see ConstantFieldsSecond
-        * @see ConstantFieldsSecondExcept
+        * The NonForwardedFieldsFirst annotation declares for a function ALL 
fields of its first input
+        * which are not preserved on the same position in its output.
+        * ALL other fields are considered to be unmodified at the same 
position.
+        * Hence, the NonForwardedFieldsFirst annotation is inverse to the 
{@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} 
annotation.
+        *
+        * <b>NOTE: The use of the NonForwardedFieldsFirst annotation is 
optional.
+        * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
+        * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
+        * Since all not declared fields are considered to be forwarded, it is 
required that ALL non-forwarded fields of the first input are declared.
+        * </b>
+        *
+        * Non-forwarded fields are declared as a list of field expressions, 
e.g., <code>\@NonForwardedFieldsFirst({"f1; f3"})</code>
+        * declares that the second and fourth field of a Java tuple from the 
first input are modified and
+        * all other fields of the first input are are not changed and remain 
on their position.
+        * A NonForwardedFieldsFirst annotation can only be used on functions 
where the type of the first input and the output are identical.
+        *
+        * Multiple non-forwarded fields can be annotated in one 
(<code>\@NonForwardedFieldsFirst({"f1; f3"})</code>)
+        * or separate Strings (<code>\@NonForwardedFieldsFirst({"f1", 
"f3"})</code>).
+        *
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field expressions such as nested fields and wildcard.
+        *
+        * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields
+        * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFieldsFirstExcept {
+       public @interface NonForwardedFieldsFirst {
                String[] value();
        }
 
        /**
-        * This annotation declares that a function changes certain fields of 
its second input value, while leaving all
-        * others unmodified and in place in the return value. The annotation 
is applicable to binary
-        * functions, like for example {@link 
org.apache.flink.api.common.functions.RichFlatJoinFunction}, {@link 
org.apache.flink.api.common.functions.RichCoGroupFunction}, or {@link 
org.apache.flink.api.common.functions.RichCrossFunction}.
-        * <p>
-        * The following example illustrates a join function that copies fields 
from the first and second input to the
-        * return value:
-        * 
-        * <pre><blockquote>
-        * \@ConstantFieldsSecondExcept("1")
-        * public class MyJoin extends JoinFunction<Tuple2<String, Double>, 
Tuple3<String, Integer, Double>, Tuple3<String, Double, Double>>
-        * {
-        *     public Tuple3<String, Double, Double> map(Tuple2<String, Double> 
first, Tuple3<String, Integer, Double> second) {
-        *         return Tuple3<String, Double, Double>(second.f0, first.f1, 
second.f2);
-        *     }
-        * }
-        * </blockquote></pre>
-        * <p>
-        * The annotation takes one String array specifying the positions of 
the input types that do not remain constant.
-        * When this annotation is used, it is assumed that all other values 
remain at the same position in input and output.
-        * To model more complex situations use the {@link ConstantFields}s 
annotation.
-        * <p>
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsSecond}
-        * <p>
-        * If neither this annotation, nor the {@link ConstantFieldsSecond} 
annotation are set, it is
-        * assumed that <i>no</i> field in the second input is forwarded/copied 
unmodified.
-        * 
-        * @see ConstantFieldsFirst
-        * @see ConstantFieldsFirstExcept
-        * @see ConstantFieldsSecond
+        * The NonForwardedFieldsSecond annotation declares for a function ALL 
fields of its second input
+        * which are not preserved on the same position in its output.
+        * ALL other fields are considered to be unmodified at the same 
position.
+        * Hence, the NonForwardedFieldsSecond annotation is inverse to the 
{@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} 
annotation.
+        *
+        * <b>NOTE: The use of the NonForwardedFieldsSecond annotation is 
optional.
+        * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
+        * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
+        * Since all not declared fields are considered to be forwarded, it is 
required that ALL non-forwarded fields of the second input are declared.
+        * </b>
+        *
+        * Non-forwarded fields are declared as a list of field expressions, 
e.g., <code>\@NonForwardedFieldsSecond({"f1; f3"})</code>
+        * declares that the second and fourth field of a Java tuple from the 
second input are modified and
+        * all other fields of the second input are are not changed and remain 
on their position.
+        * A NonForwardedFieldsSecond annotation can only be used on functions 
where the type of the second input and the output are identical.
+        *
+        * Multiple non-forwarded fields can be annotated in one 
(<code>\@NonForwardedFieldsSecond({"f1; f3"})</code>)
+        * or separate Strings (<code>\@NonForwardedFieldsSecond({"f1", 
"f3"})</code>).
+        *
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field expressions such as nested fields and wildcard.
+        *
+        * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields
+        * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFieldsSecondExcept {
+       public @interface NonForwardedFieldsSecond {
                String[] value();
        }
 
        /**
-        * Specifies the fields of the input value of a user-defined that are 
accessed in the code.
-        * This annotation can only be used with user-defined functions with 
one input (Map, Reduce, ...).
+        * The ReadFields annotation declares for a function all fields which 
it accesses and evaluates, i.e.,
+        * all fields that are used by the function to compute its result.
+        * For example, fields which are evaluated in conditional statements or 
used for computations are considered to be read.
+        * Fields which are only unmodified copied to the output without 
evaluating their values are NOT considered to be read.
+        *
+        * <b>NOTE: The use of the ReadFields annotation is optional.
+        * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
+        * The ReadFields annotation requires that ALL read fields are declared.
+        * Otherwise, it can cause invalid plan choices and the computation of 
wrong results!
+        * Declaring a non-read field as read is not harmful but might reduce 
optimization potential.
+        * </b>
+        *
+        * Read fields are declared as a list of field expressions, e.g., 
<code>\@ReadFields({"f0; f2"})</code> declares the first and third
+        * field of a Java input tuple to be read. All other fields are 
considered to not influence the behavior of the function.
+        *
+        * Multiple read fields can be declared in one <code>\@ReadFields({"f0; 
f2"})</code> or
+        * multiple separate Strings <code>\@ReadFields({"f0", "f2"})</code>.
+        *
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field expressions such as nested fields and wildcard.
+        *
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
@@ -282,22 +311,62 @@ public class FunctionAnnotation {
        }
 
        /**
-        * Specifies the fields of the first input value of a user-defined that 
are accessed in the code.
-        * This annotation can only be used with user-defined functions with 
two inputs (Join, Cross, ...).
+        * The ReadFieldsFirst annotation declares for a function all fields of 
the first input which it accesses and evaluates, i.e.,
+        * all fields of the first input that are used by the function to 
compute its result.
+        * For example, fields which are evaluated in conditional statements or 
used for computations are considered to be read.
+        * Fields which are only unmodified copied to the output without 
evaluating their values are NOT considered to be read.
+        *
+        * <b>NOTE: The use of the ReadFieldsFirst annotation is optional.
+        * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
+        * The ReadFieldsFirst annotation requires that ALL read fields of the 
first input are declared.
+        * Otherwise, it can cause invalid plan choices and the computation of 
wrong results!
+        * Declaring a non-read field as read is not harmful but might reduce 
optimization potential.
+        * </b>
+        *
+        * Read fields are declared as a list of field expressions, e.g., 
<code>\@ReadFieldsFirst({"f0; f2"})</code> declares the first and third
+        * field of a Java input tuple of the first input to be read.
+        * All other fields of the first input are considered to not influence 
the behavior of the function.
+        *
+        * Multiple read fields can be declared in one 
<code>\@ReadFieldsFirst({"f0; f2"})</code> or
+        * multiple separate Strings <code>\@ReadFieldsFirst({"f0", 
"f2"})</code>.
+        *
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field expressions such as nested fields and wildcard.
+        *
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       public @interface ReadFieldsSecond {
+       public @interface ReadFieldsFirst {
                String[] value();
        }
 
        /**
-        * Specifies the fields of the second input value of a user-defined 
that are accessed in the code.
-        * This annotation can only be used with user-defined functions with 
two inputs (Join, Cross, ...).
+        * The ReadFieldsSecond annotation declares for a function all fields 
of the second input which it accesses and evaluates, i.e.,
+        * all fields of the second input that are used by the function to 
compute its result.
+        * For example, fields which are evaluated in conditional statements or 
used for computations are considered to be read.
+        * Fields which are only unmodified copied to the output without 
evaluating their values are NOT considered to be read.
+        *
+        * <b>NOTE: The use of the ReadFieldsSecond annotation is optional.
+        * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
+        * The ReadFieldsSecond annotation requires that ALL read fields of the 
second input are declared.
+        * Otherwise, it can cause invalid plan choices and the computation of 
wrong results!
+        * Declaring a non-read field as read is not harmful but might reduce 
optimization potential.
+        * </b>
+        *
+        * Read fields are declared as a list of field expressions, e.g., 
<code>\@ReadFieldsSecond({"f0; f2"})</code> declares the first and third
+        * field of a Java input tuple of the second input to be read.
+        * All other fields of the second input are considered to not influence 
the behavior of the function.
+        *
+        * Multiple read fields can be declared in one 
<code>\@ReadFieldsSecond({"f0; f2"})</code> or
+        * multiple separate Strings <code>\@ReadFieldsSecond({"f0", 
"f2"})</code>.
+        *
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field expressions such as nested fields and wildcard.
+        *
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       public @interface ReadFieldsFirst {
+       public @interface ReadFieldsSecond {
                String[] value();
        }
        
@@ -311,111 +380,82 @@ public class FunctionAnnotation {
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Reads the annotations of a user defined function with one input and 
returns semantic properties according to the constant fields annotated.
+        * Reads the annotations of a user defined function with one input and 
returns semantic properties according to the forwarded fields annotated.
         * 
         * @param udfClass The user defined function, represented by its class.
-        * @return      The DualInputSemanticProperties containing the constant 
fields.
+        * @return      The DualInputSemanticProperties containing the 
forwarded fields.
         */
-       public static Set<Annotation> readSingleConstantAnnotations(Class<?> 
udfClass) {
-               ConstantFields constantSet = 
udfClass.getAnnotation(ConstantFields.class);
-               ConstantFieldsExcept notConstantSet = 
udfClass.getAnnotation(ConstantFieldsExcept.class);
-               ReadFields readfieldSet = 
udfClass.getAnnotation(ReadFields.class);
-
-               Set<Annotation> result = null;
-
-               if (notConstantSet != null && constantSet != null) {
-                       throw new InvalidProgramException("Either " + 
ConstantFields.class.getSimpleName() + " or " + 
-                                       
ConstantFieldsExcept.class.getSimpleName() + " can be annotated to a function, 
not both.");
-               }
-
-               if (notConstantSet != null) {
-                       result = new HashSet<Annotation>();
-                       result.add(notConstantSet);
+       public static Set<Annotation> readSingleForwardAnnotations(Class<?> 
udfClass) {
+               ForwardedFields forwardedFields = 
udfClass.getAnnotation(ForwardedFields.class);
+               NonForwardedFields nonForwardedFields = 
udfClass.getAnnotation(NonForwardedFields.class);
+               ReadFields readSet = udfClass.getAnnotation(ReadFields.class);
+
+               Set<Annotation> annotations = new HashSet<Annotation>();
+               if(forwardedFields != null) {
+                       annotations.add(forwardedFields);
                }
-               if (constantSet != null) {
-                       result = new HashSet<Annotation>();
-                       result.add(constantSet);
-               }
-
-               if (readfieldSet != null) {
-                       if (result == null) {
-                               result = new HashSet<Annotation>();
+               if(nonForwardedFields != null) {
+                       if(!annotations.isEmpty()) {
+                               throw new InvalidProgramException("Either " + 
ForwardedFields.class.getSimpleName() + " or " +
+                                               
NonForwardedFields.class.getSimpleName() + " can be annotated to a function, 
not both.");
                        }
-                       result.add(readfieldSet);
+                       annotations.add(nonForwardedFields);
+               }
+               if(readSet != null) {
+                       annotations.add(readSet);
                }
 
-               return result;
+               return !annotations.isEmpty() ? annotations : null;
        }
 
        // 
--------------------------------------------------------------------------------------------
        /**
-        * Reads the annotations of a user defined function with two inputs and 
returns semantic properties according to the constant fields annotated.
+        * Reads the annotations of a user defined function with two inputs and 
returns semantic properties according to the forwarded fields annotated.
         * @param udfClass The user defined function, represented by its class.
-        * @return      The DualInputSemanticProperties containing the constant 
fields.
+        * @return      The DualInputSemanticProperties containing the 
forwarded fields.
         */
 
-       public static Set<Annotation> readDualConstantAnnotations(Class<?> 
udfClass) {
+       public static Set<Annotation> readDualForwardAnnotations(Class<?> 
udfClass) {
 
                // get readSet annotation from stub
-               ConstantFieldsFirst constantSet1 = 
udfClass.getAnnotation(ConstantFieldsFirst.class);
-               ConstantFieldsSecond constantSet2= 
udfClass.getAnnotation(ConstantFieldsSecond.class);
+               ForwardedFieldsFirst forwardedFields1 = 
udfClass.getAnnotation(ForwardedFieldsFirst.class);
+               ForwardedFieldsSecond forwardedFields2= 
udfClass.getAnnotation(ForwardedFieldsSecond.class);
 
                // get readSet annotation from stub
-               ConstantFieldsFirstExcept notConstantSet1 = 
udfClass.getAnnotation(ConstantFieldsFirstExcept.class);
-               ConstantFieldsSecondExcept notConstantSet2 = 
udfClass.getAnnotation(ConstantFieldsSecondExcept.class);
+               NonForwardedFieldsFirst nonForwardedFields1 = 
udfClass.getAnnotation(NonForwardedFieldsFirst.class);
+               NonForwardedFieldsSecond nonForwardedFields2 = 
udfClass.getAnnotation(NonForwardedFieldsSecond.class);
 
-               ReadFieldsFirst readfieldSet1 = 
udfClass.getAnnotation(ReadFieldsFirst.class);
-               ReadFieldsSecond readfieldSet2 = 
udfClass.getAnnotation(ReadFieldsSecond.class);
+               ReadFieldsFirst readSet1 = 
udfClass.getAnnotation(ReadFieldsFirst.class);
+               ReadFieldsSecond readSet2 = 
udfClass.getAnnotation(ReadFieldsSecond.class);
 
-               if (notConstantSet1 != null && constantSet1 != null) {
-                       throw new InvalidProgramException("Either " + 
ConstantFieldsFirst.class.getSimpleName() + " or " + 
-                                       
ConstantFieldsFirstExcept.class.getSimpleName() + " can be annotated to a 
function, not both.");
-               }
+               Set<Annotation> annotations = new HashSet<Annotation>();
 
-               if (constantSet2 != null && notConstantSet2 != null) {
-                       throw new InvalidProgramException("Either " + 
ConstantFieldsSecond.class.getSimpleName() + " or " + 
-                                       
ConstantFieldsSecondExcept.class.getSimpleName() + " can be annotated to a 
function, not both.");
+               if (nonForwardedFields1 != null && forwardedFields1 != null) {
+                       throw new InvalidProgramException("Either " + 
ForwardedFieldsFirst.class.getSimpleName() + " or " +
+                                       
NonForwardedFieldsFirst.class.getSimpleName() + " can be annotated to a 
function, not both.");
+               } else if (forwardedFields1 != null) {
+                       annotations.add(forwardedFields1);
+               } else if (nonForwardedFields1 != null) {
+                       annotations.add(nonForwardedFields1);
                }
 
-               Set<Annotation> result = null;
-
-               if (notConstantSet2 != null) {
-                       result = new HashSet<Annotation>();
-                       result.add(notConstantSet2);
-               }
-               if (constantSet2 != null) {
-                       result = new HashSet<Annotation>();
-                       result.add(constantSet2);
+               if (forwardedFields2 != null && nonForwardedFields2 != null) {
+                       throw new InvalidProgramException("Either " + 
ForwardedFieldsSecond.class.getSimpleName() + " or " +
+                                       
NonForwardedFieldsSecond.class.getSimpleName() + " can be annotated to a 
function, not both.");
+               } else if (forwardedFields2 != null) {
+                       annotations.add(forwardedFields2);
+               } else if (nonForwardedFields2 != null) {
+                       annotations.add(nonForwardedFields2);
                }
 
-               if (readfieldSet2 != null) {
-                       if (result == null) {
-                               result = new HashSet<Annotation>();
-                       }
-                       result.add(readfieldSet2);
-               }
-
-               if (notConstantSet1 != null) {
-                       if (result == null) {
-                               result = new HashSet<Annotation>();
-                       }
-                       result.add(notConstantSet1);
+               if (readSet1 != null) {
+                       annotations.add(readSet1);
                }
-               if (constantSet1 != null) {
-                       if (result == null) {
-                               result = new HashSet<Annotation>();
-                       }
-                       result.add(constantSet1);
-               }
-
-               if (readfieldSet1 != null) {
-                       if (result == null) {
-                               result = new HashSet<Annotation>();
-                       }
-                       result.add(readfieldSet1);
+               if (readSet2 != null) {
+                       annotations.add(readSet2);
                }
 
-               return result;
+               return !annotations.isEmpty() ? annotations : null;
        }
 }
 

Reply via email to