http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
index e5d7155..341d87e 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.operators.translation.PlanProjectOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -37,16 +39,15 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
 
-/**
- * Created by sebastian on 6/19/14.
- */
 public class SemanticPropertiesProjectionTest {
 
        final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData 
= new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
 
+
        final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> 
tupleTypeInfo = new
                        TupleTypeInfo<Tuple5<Integer, Long, String, Long, 
Integer>>(
                        BasicTypeInfo.INT_TYPE_INFO,
@@ -56,184 +57,228 @@ public class SemanticPropertiesProjectionTest {
                        BasicTypeInfo.INT_TYPE_INFO
        );
 
+       final List<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, 
Long>, String>> emptyNestedTupleData =
+                       new ArrayList<Tuple4<Integer, Tuple3<String, Integer, 
Long>, Tuple2<Long, Long>, String>>();
+
+       final TupleTypeInfo<Tuple4<Integer, Tuple3<String, Integer, Long>, 
Tuple2<Long, Long>, String>> nestedTupleTypeInfo = new
+                       TupleTypeInfo<Tuple4<Integer, Tuple3<String, Integer, 
Long>, Tuple2<Long, Long>, String>>(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       new TupleTypeInfo<Tuple3<String,  Integer, 
Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO),
+                       new TupleTypeInfo<Tuple2<Long, 
Long>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO),
+                       BasicTypeInfo.STRING_TYPE_INFO
+                       );
 
        @Test
-       public void ProjectOperatorTest() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       DataSet<Tuple5<Integer, Long, String, Long, Integer>> 
tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
-
-                       tupleDs.project(1, 3, 2).print();
-
-                       Plan plan = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-                       PlanProjectOperator<?, ?> projectOperator = 
((PlanProjectOperator<?, ?>) sink.getInput());
-
-                       SingleInputSemanticProperties props = 
projectOperator.getSemanticProperties();
-
-                       assertTrue(props.getForwardedField(1).size() == 1);
-                       assertTrue(props.getForwardedField(3).size() == 1);
-                       assertTrue(props.getForwardedField(2).size() == 1);
-                       assertTrue(props.getForwardedField(1).contains(0));
-                       assertTrue(props.getForwardedField(3).contains(1));
-                       assertTrue(props.getForwardedField(2).contains(2));
-               } catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Exception in test: " + e.getMessage());
-               }
+       public void testProjectionSemProps1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = 
env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+               tupleDs.project(1, 3, 2).project(0, 3).print();
+
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               PlanProjectOperator<?, ?> projectOperator = 
((PlanProjectOperator<?, ?>) sink.getInput());
+
+               SingleInputSemanticProperties props = 
projectOperator.getSemanticProperties();
+
+               assertEquals(1, props.getForwardingTargetFields(0, 0).size());
+               assertEquals(1, props.getForwardingTargetFields(0, 1).size());
+               assertEquals(1, props.getForwardingTargetFields(0, 2).size());
+               assertEquals(2, props.getForwardingTargetFields(0, 3).size());
+
+               assertTrue(props.getForwardingTargetFields(0, 1).contains(0));
+               assertTrue(props.getForwardingTargetFields(0, 3).contains(1));
+               assertTrue(props.getForwardingTargetFields(0, 2).contains(2));
+               assertTrue(props.getForwardingTargetFields(0, 0).contains(3));
+               assertTrue(props.getForwardingTargetFields(0, 3).contains(4));
        }
-       
+
        @Test
-       public void ProjectOperatorWithoutTypesClassTest() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       DataSet<Tuple5<Integer, Long, String, Long, Integer>> 
tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
-                       
-                       tupleDs.project(1, 3).project(2).project(0).print();
-
-                       Plan plan = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-                       PlanProjectOperator<?, ?> projectOperator = 
((PlanProjectOperator<?, ?>) sink.getInput());
-
-                       SingleInputSemanticProperties props = 
projectOperator.getSemanticProperties();
-
-                       assertTrue(props.getForwardedField(1).size() == 1);
-                       assertTrue(props.getForwardedField(3).size() == 1);
-                       assertTrue(props.getForwardedField(2).size() == 1);
-                       assertTrue(props.getForwardedField(0).size() == 1);
-                       assertTrue(props.getForwardedField(1).contains(0));
-                       assertTrue(props.getForwardedField(3).contains(1));
-                       assertTrue(props.getForwardedField(2).contains(2));
-                       assertTrue(props.getForwardedField(0).contains(3));
-               } catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Exception in test: " + e.getMessage());
-               }
+       public void testProjectionSemProps2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, 
Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, 
nestedTupleTypeInfo);
+
+               tupleDs.project(2, 3, 1).project(2).print();
+
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               PlanProjectOperator<?, ?> projectOperator = 
((PlanProjectOperator<?, ?>) sink.getInput());
+
+               SingleInputSemanticProperties props = 
projectOperator.getSemanticProperties();
+
+               assertNotNull(props.getForwardingTargetFields(0, 0));
+               assertEquals(1, props.getForwardingTargetFields(0, 1).size());
+               assertEquals(1, props.getForwardingTargetFields(0, 2).size());
+               assertEquals(1, props.getForwardingTargetFields(0, 3).size());
+               assertEquals(2, props.getForwardingTargetFields(0, 4).size());
+               assertEquals(2, props.getForwardingTargetFields(0, 5).size());
+               assertEquals(1, props.getForwardingTargetFields(0, 6).size());
+               assertEquals(0, props.getForwardingTargetFields(0, 0).size());
+
+               assertTrue(props.getForwardingTargetFields(0, 4).contains(0));
+               assertTrue(props.getForwardingTargetFields(0, 5).contains(1));
+               assertTrue(props.getForwardingTargetFields(0, 6).contains(2));
+               assertTrue(props.getForwardingTargetFields(0, 1).contains(3));
+               assertTrue(props.getForwardingTargetFields(0, 2).contains(4));
+               assertTrue(props.getForwardingTargetFields(0, 3).contains(5));
+               assertTrue(props.getForwardingTargetFields(0, 4).contains(6));
+               assertTrue(props.getForwardingTargetFields(0, 5).contains(7));
        }
 
        @Test
-       public void JoinProjectionTest() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       DataSet<Tuple5<Integer, Long, String, Long, Integer>> 
tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
-
-                       
tupleDs.join(tupleDs).where(0).equalTo(0).projectFirst(2, 3).projectSecond(1, 
4).print();
-
-                       Plan plan = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-                       JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = 
((JoinOperatorBase<?, ?, ?, ?>) sink.getInput());
-
-                       DualInputSemanticProperties props = 
projectJoinOperator.getSemanticProperties();
-
-                       assertTrue(props.getForwardedField1(2).size() == 1);
-                       assertTrue(props.getForwardedField1(3).size() == 1);
-                       assertTrue(props.getForwardedField2(1).size() == 1);
-                       assertTrue(props.getForwardedField2(4).size() == 1);
-                       assertTrue(props.getForwardedField1(2).contains(0));
-                       assertTrue(props.getForwardedField1(3).contains(1));
-                       assertTrue(props.getForwardedField2(1).contains(2));
-                       assertTrue(props.getForwardedField2(4).contains(3));
-               } catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Exception in test: " + e.getMessage());
-               }
+       public void testJoinProjectionSemProps1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = 
env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+               tupleDs.join(tupleDs).where(0).equalTo(0)
+                               .projectFirst(2, 3)
+                               .projectSecond(1, 4)
+                               .print();
+
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = 
((JoinOperatorBase<?, ?, ?, ?>) sink.getInput());
+
+               DualInputSemanticProperties props = 
projectJoinOperator.getSemanticProperties();
+
+               assertEquals(1, props.getForwardingTargetFields(0, 2).size());
+               assertEquals(1, props.getForwardingTargetFields(0, 3).size());
+               assertEquals(1, props.getForwardingTargetFields(1, 1).size());
+               assertEquals(1, props.getForwardingTargetFields(1, 4).size());
+
+               assertTrue(props.getForwardingTargetFields(0, 2).contains(0));
+               assertTrue(props.getForwardingTargetFields(0, 3).contains(1));
+               assertTrue(props.getForwardingTargetFields(1, 1).contains(2));
+               assertTrue(props.getForwardingTargetFields(1, 4).contains(3));
        }
-       
+
        @Test
-       public void JoinProjectionWithoutTypesClassTest() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       DataSet<Tuple5<Integer, Long, String, Long, Integer>> 
tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
-                       
-                       
tupleDs.join(tupleDs).where(0).equalTo(0).projectFirst(2).projectFirst(3).projectSecond(1,
 4).print();
-
-                       Plan plan = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-                       JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = 
((JoinOperatorBase<?, ?, ?, ?>) sink.getInput());
-
-                       DualInputSemanticProperties props = 
projectJoinOperator.getSemanticProperties();
-
-                       assertTrue(props.getForwardedField1(2).size() == 1);
-                       assertTrue(props.getForwardedField1(3).size() == 1);
-                       assertTrue(props.getForwardedField2(1).size() == 1);
-                       assertTrue(props.getForwardedField2(4).size() == 1);
-                       assertTrue(props.getForwardedField1(2).contains(0));
-                       assertTrue(props.getForwardedField1(3).contains(1));
-                       assertTrue(props.getForwardedField2(1).contains(2));
-                       assertTrue(props.getForwardedField2(4).contains(3));
-               } catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Exception in test: " + e.getMessage());
-               }
+       public void testJoinProjectionSemProps2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, 
Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, 
nestedTupleTypeInfo);
+
+               tupleDs.join(tupleDs).where(0).equalTo(0)
+                               .projectFirst(2,0)
+                               .projectSecond(1,3)
+                               .print();
+
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = 
((JoinOperatorBase<?, ?, ?, ?>) sink.getInput());
+
+               DualInputSemanticProperties props = 
projectJoinOperator.getSemanticProperties();
+
+               assertEquals(1, props.getForwardingTargetFields(0, 0).size());
+               assertNotNull(props.getForwardingTargetFields(0, 1));
+               assertNotNull(props.getForwardingTargetFields(0, 2));
+               assertNotNull(props.getForwardingTargetFields(0, 3));
+               assertEquals(1, props.getForwardingTargetFields(0, 4).size());
+               assertEquals(1, props.getForwardingTargetFields(0, 5).size());
+               assertNotNull(props.getForwardingTargetFields(0, 6));
+               assertEquals(0, props.getForwardingTargetFields(0, 1).size());
+               assertEquals(0, props.getForwardingTargetFields(0, 2).size());
+               assertEquals(0, props.getForwardingTargetFields(0, 3).size());
+               assertEquals(0, props.getForwardingTargetFields(0, 6).size());
+
+               assertNotNull(props.getForwardingTargetFields(1, 0));
+               assertEquals(1, props.getForwardingTargetFields(1, 1).size());
+               assertEquals(1, props.getForwardingTargetFields(1, 2).size());
+               assertEquals(1, props.getForwardingTargetFields(1, 3).size());
+               assertNotNull(props.getForwardingTargetFields(1, 4));
+               assertNotNull(props.getForwardingTargetFields(1, 5));
+               assertEquals(1, props.getForwardingTargetFields(1, 6).size());
+               assertEquals(0, props.getForwardingTargetFields(1, 0).size());
+               assertEquals(0, props.getForwardingTargetFields(1, 4).size());
+               assertEquals(0, props.getForwardingTargetFields(1, 5).size());
+
+               assertTrue(props.getForwardingTargetFields(0, 4).contains(0));
+               assertTrue(props.getForwardingTargetFields(0, 5).contains(1));
+               assertTrue(props.getForwardingTargetFields(0, 0).contains(2));
+               assertTrue(props.getForwardingTargetFields(1, 1).contains(3));
+               assertTrue(props.getForwardingTargetFields(1, 2).contains(4));
+               assertTrue(props.getForwardingTargetFields(1, 3).contains(5));
+               assertTrue(props.getForwardingTargetFields(1, 6).contains(6));
        }
 
        @Test
-       public void CrossProjectionTest() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       DataSet<Tuple5<Integer, Long, String, Long, Integer>> 
tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
-
-                       DataSet<Tuple4<String, Long, Long, Integer>> result = 
tupleDs.cross(tupleDs).projectFirst(2, 3).projectSecond(1, 4);
-                       result.print();
-
-                       Plan plan = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-                       CrossOperatorBase<?, ?, ?, ?> projectCrossOperator = 
((CrossOperatorBase<?, ?, ?, ?>) sink.getInput());
-
-                       DualInputSemanticProperties props = 
projectCrossOperator.getSemanticProperties();
-
-                       assertTrue(props.getForwardedField1(2).size() == 1);
-                       assertTrue(props.getForwardedField1(3).size() == 1);
-                       assertTrue(props.getForwardedField2(1).size() == 1);
-                       assertTrue(props.getForwardedField2(4).size() == 1);
-                       assertTrue(props.getForwardedField1(2).contains(0));
-                       assertTrue(props.getForwardedField1(3).contains(1));
-                       assertTrue(props.getForwardedField2(1).contains(2));
-                       assertTrue(props.getForwardedField2(4).contains(3));
-               } catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Exception in test: " + e.getMessage());
-               }
+       public void testCrossProjectionSemProps1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = 
env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+               tupleDs.cross(tupleDs)
+                               .projectFirst(2, 3)
+                               .projectSecond(1, 4)
+                               .print();
+
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               CrossOperatorBase<?, ?, ?, ?> projectCrossOperator = 
((CrossOperatorBase<?, ?, ?, ?>) sink.getInput());
+
+               DualInputSemanticProperties props = 
projectCrossOperator.getSemanticProperties();
+
+               assertEquals(1, props.getForwardingTargetFields(0, 2).size());
+               assertEquals(1, props.getForwardingTargetFields(0, 3).size());
+               assertEquals(1, props.getForwardingTargetFields(1, 1).size());
+               assertEquals(1, props.getForwardingTargetFields(1, 4).size());
+
+               assertTrue(props.getForwardingTargetFields(0, 2).contains(0));
+               assertTrue(props.getForwardingTargetFields(0, 3).contains(1));
+               assertTrue(props.getForwardingTargetFields(1, 1).contains(2));
+               assertTrue(props.getForwardingTargetFields(1, 4).contains(3));
        }
-       
+
        @Test
-       public void CrossProjectionWithoutTypesClassTest() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       DataSet<Tuple5<Integer, Long, String, Long, Integer>> 
tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
-
-                       DataSet<Tuple4<String, Long, Long, Integer>> result = 
tupleDs.cross(tupleDs).projectFirst(2, 3).projectSecond(1, 4);
-                       result.print();
-
-                       Plan plan = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-                       CrossOperatorBase<?, ?, ?, ?> projectCrossOperator = 
((CrossOperatorBase<?, ?, ?, ?>) sink.getInput());
-
-                       DualInputSemanticProperties props = 
projectCrossOperator.getSemanticProperties();
-
-                       assertTrue(props.getForwardedField1(2).size() == 1);
-                       assertTrue(props.getForwardedField1(3).size() == 1);
-                       assertTrue(props.getForwardedField2(1).size() == 1);
-                       assertTrue(props.getForwardedField2(4).size() == 1);
-                       assertTrue(props.getForwardedField1(2).contains(0));
-                       assertTrue(props.getForwardedField1(3).contains(1));
-                       assertTrue(props.getForwardedField2(1).contains(2));
-                       assertTrue(props.getForwardedField2(4).contains(3));
-               } catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Exception in test: " + e.getMessage());
-               }
+       public void testCrossProjectionSemProps2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, 
Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, 
nestedTupleTypeInfo);
+
+               tupleDs.cross(tupleDs)
+                               .projectFirst(2, 0)
+                               .projectSecond(1,3)
+                               .print();
+
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               CrossOperatorBase<?, ?, ?, ?> projectCrossOperator = 
((CrossOperatorBase<?, ?, ?, ?>) sink.getInput());
+
+               DualInputSemanticProperties props = 
projectCrossOperator.getSemanticProperties();
+
+               assertEquals(1, props.getForwardingTargetFields(0, 0).size());
+               assertNotNull(props.getForwardingTargetFields(0, 1));
+               assertNotNull(props.getForwardingTargetFields(0, 2));
+               assertNotNull(props.getForwardingTargetFields(0, 3));
+               assertEquals(1, props.getForwardingTargetFields(0, 4).size());
+               assertEquals(1, props.getForwardingTargetFields(0, 5).size());
+               assertNotNull(props.getForwardingTargetFields(0, 6));
+               assertEquals(0, props.getForwardingTargetFields(0, 1).size());
+               assertEquals(0, props.getForwardingTargetFields(0, 2).size());
+               assertEquals(0, props.getForwardingTargetFields(0, 3).size());
+               assertEquals(0, props.getForwardingTargetFields(0, 6).size());
+
+               assertNotNull(props.getForwardingTargetFields(1, 0));
+               assertEquals(1, props.getForwardingTargetFields(1, 1).size());
+               assertEquals(1, props.getForwardingTargetFields(1, 2).size());
+               assertEquals(1, props.getForwardingTargetFields(1, 3).size());
+               assertNotNull(props.getForwardingTargetFields(1, 4));
+               assertNotNull(props.getForwardingTargetFields(1, 5));
+               assertEquals(1, props.getForwardingTargetFields(1, 6).size());
+               assertEquals(0, props.getForwardingTargetFields(1, 0).size());
+               assertEquals(0, props.getForwardingTargetFields(1, 4).size());
+               assertEquals(0, props.getForwardingTargetFields(1, 5).size());
+
+               assertTrue(props.getForwardingTargetFields(0, 4).contains(0));
+               assertTrue(props.getForwardingTargetFields(0, 5).contains(1));
+               assertTrue(props.getForwardingTargetFields(0, 0).contains(2));
+               assertTrue(props.getForwardingTargetFields(1, 1).contains(3));
+               assertTrue(props.getForwardingTargetFields(1, 2).contains(4));
+               assertTrue(props.getForwardingTargetFields(1, 3).contains(5));
+               assertTrue(props.getForwardingTargetFields(1, 6).contains(6));
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
index bd8b55e..f0124e3 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
@@ -22,17 +22,18 @@ package org.apache.flink.api.java.functions;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.junit.Test;
@@ -43,247 +44,496 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 /**
  * This is a minimal test to verify that semantic annotations are evaluated 
against
  * the type information properly translated correctly to the common data flow 
API.
- * 
- * This covers only the constant fields annotations currently !!!
+ *
  */
 @SuppressWarnings("serial")
 public class SemanticPropertiesTranslationTest {
        
-       /**
-        * A mapper that preserves all fields over a tuple data set.
-        */
        @Test
-       public void translateUnaryFunctionAnnotationTuplesWildCard() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       @SuppressWarnings("unchecked")
-                       DataSet<Tuple3<Long, String, Integer>> input = 
env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
-                       input.map(new 
WildcardConstantMapper<Tuple3<Long,String,Integer>>()).print();
-                       
-                       Plan plan = env.createProgramPlan();
-                       
-                       GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-                       MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, 
?, ?>) sink.getInput();
-                       
-                       SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
-                       
-                       FieldSet fw1 = semantics.getForwardedField(0);
-                       FieldSet fw2 = semantics.getForwardedField(1);
-                       FieldSet fw3 = semantics.getForwardedField(2);
-                       
-                       assertNotNull(fw1);
-                       assertNotNull(fw2);
-                       assertNotNull(fw3);
-                       
-                       assertTrue(fw1.contains(0));
-                       assertTrue(fw2.contains(1));
-                       assertTrue(fw3.contains(2));
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Exception in test: " + e.getMessage());
-               }
+       public void testUnaryFunctionWildcardForwardedAnnotation() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, String, Integer>> input = 
env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
+               input.map(new 
WildcardForwardedMapper<Tuple3<Long,String,Integer>>()).print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) 
sink.getInput();
+
+               SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
+
+               FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+               FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
+               FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
+               assertNotNull(fw1);
+               assertNotNull(fw2);
+               assertNotNull(fw3);
+               assertTrue(fw1.contains(0));
+               assertTrue(fw2.contains(1));
+               assertTrue(fw3.contains(2));
        }
        
-       /**
-        * A mapper that preserves fields 0, 1, 2 of a tuple data set.
-        */
        @Test
-       public void translateUnaryFunctionAnnotationTuples() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       @SuppressWarnings("unchecked")
-                       DataSet<Tuple3<Long, String, Integer>> input = 
env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
-                       input.map(new IndividualConstantMapper<Long, String, 
Integer>()).print();
-                       
-                       Plan plan = env.createProgramPlan();
-                       
-                       GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-                       MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, 
?, ?>) sink.getInput();
-                       
-                       SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
-                       
-                       FieldSet fw1 = semantics.getForwardedField(0);
-                       FieldSet fw2 = semantics.getForwardedField(1);
-                       FieldSet fw3 = semantics.getForwardedField(2);
-                       
-                       assertNotNull(fw1);
-                       assertNotNull(fw2);
-                       assertNotNull(fw3);
-                       
-                       assertTrue(fw1.contains(0));
-                       assertTrue(fw2.contains(1));
-                       assertTrue(fw3.contains(2));
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Exception in test: " + e.getMessage());
-               }
+       public void testUnaryFunctionInPlaceForwardedAnnotation() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, String, Integer>> input = 
env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
+               input.map(new IndividualForwardedMapper<Long, String, 
Integer>()).print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) 
sink.getInput();
+
+               SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
+
+               FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+               FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
+               assertNotNull(fw1);
+               assertNotNull(fw2);
+               assertTrue(fw1.contains(0));
+               assertTrue(fw2.contains(2));
        }
-       
-//     /**
-//      * A mapper that preserves all fields over a data set of an atomic type.
-//      */
-//     @Test
-//     public void translateUnaryFunctionAnnotationAtomicWildCard() {
-//             try {
-//                     ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-//                     DataSet<Long> input = env.generateSequence(0, 1000);
-//                     input.map(new WildcardConstantMapper<Long>()).print();
-//                     
-//                     Plan plan = env.createProgramPlan();
-//                     
-//                     GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-//                     MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, 
?, ?>) sink.getInput();
-//                     
-//                     SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
-//                     
-//                     FieldSet fw1 = semantics.getForwardedField(0);
-//                     assertNotNull(fw1);
-//                     assertTrue(fw1.contains(0));
-//             }
-//             catch (Exception e) {
-//                     System.err.println(e.getMessage());
-//                     e.printStackTrace();
-//                     fail("Exception in test: " + e.getMessage());
-//             }
-//     }
-       
-//     /**
-//      * A mapper that preserves field zero of a data set of an atomic type.
-//      */
-//     @Test
-//     public void translateUnaryFunctionAnnotationAtomicZero() {
-//             try {
-//                     ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-//                     DataSet<Long> input = env.generateSequence(0, 1000);
-//                     input.map(new ZeroConstantMapper<Long>()).print();
-//                     
-//                     Plan plan = env.createProgramPlan();
-//                     
-//                     GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-//                     MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, 
?, ?>) sink.getInput();
-//                     
-//                     SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
-//                     
-//                     FieldSet fw1 = semantics.getForwardedField(0);
-//                     FieldSet fw2 = semantics.getForwardedField(1);
-//                     FieldSet fw3 = semantics.getForwardedField(2);
-//                     
-//                     assertNotNull(fw1);
-//                     assertNotNull(fw2);
-//                     assertNotNull(fw3);
-//                     
-//                     assertTrue(fw1.contains(0));
-//                     assertTrue(fw2.contains(1));
-//                     assertTrue(fw3.contains(2));
-//             }
-//             catch (Exception e) {
-//                     System.err.println(e.getMessage());
-//                     e.printStackTrace();
-//                     fail("Exception in test: " + e.getMessage());
-//             }
-//     }
-       
-       
-       /**
-        * A join that preserves tuple fields from both sides.
-        */
+
        @Test
-       public void translateBinaryFunctionAnnotationTuples() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       @SuppressWarnings("unchecked")
-                       DataSet<Tuple2<Long, String>> input1 = 
env.fromElements(new Tuple2<Long, String>(3l, "test"));
-                       @SuppressWarnings("unchecked")
-                       DataSet<Tuple2<Long, Double>> input2 = 
env.fromElements(new Tuple2<Long, Double>(3l, 3.1415));
-                       
-                       input1.join(input2).where(0).equalTo(0).with(new 
ForwardingTupleJoin<Long, String, Long, Double>())
+       public void testUnaryFunctionMovingForwardedAnnotation() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input.map(new ShufflingMapper<Long>()).print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) 
sink.getInput();
+
+               SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
+
+               FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+               FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
+               FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
+               assertNotNull(fw1);
+               assertNotNull(fw2);
+               assertNotNull(fw3);
+               assertTrue(fw1.contains(2));
+               assertTrue(fw2.contains(0));
+               assertTrue(fw3.contains(1));
+       }
+
+       @Test
+       public void testUnaryFunctionForwardedInLine1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input.map(new NoAnnotationMapper<Tuple3<Long, Long, 
Long>>()).withForwardedFields("0->1; 2")
                                .print();
-                       
-                       Plan plan = env.createProgramPlan();
-                       
-                       GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-                       JoinOperatorBase<?, ?, ?, ?> join = 
(JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
-                       
-                       DualInputSemanticProperties semantics = 
join.getSemanticProperties();
-                       
-                       FieldSet fw11 = semantics.getForwardedField1(0);
-                       FieldSet fw12 = semantics.getForwardedField1(1);
-                       FieldSet fw21 = semantics.getForwardedField2(0);
-                       FieldSet fw22 = semantics.getForwardedField2(1);
-                       
-                       assertNull(fw11);
-                       assertNull(fw21);
-                       
-                       assertNotNull(fw12);
-                       assertNotNull(fw22);
-                       
-                       assertTrue(fw12.contains(0));
-                       assertTrue(fw22.contains(1));
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Exception in test: " + e.getMessage());
-               }
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) 
sink.getInput();
+
+               SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
+
+               FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+               FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
+               assertNotNull(fw1);
+               assertNotNull(fw2);
+               assertTrue(fw1.contains(1));
+               assertTrue(fw2.contains(2));
        }
-       
-//     /**
-//      * A join that preserves atomic fields from both sides.
-//      */
-//     @Test
-//     public void translateBinaryFunctionAnnotationAtomicType() {
-//             try {
-//                     ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-//                     
-//                     DataSet<Long> input1 = env.generateSequence(0, 1000);
-//                     DataSet<Long> input2 = env.generateSequence(0, 1000);
-//                     
-//                     input1.join(input2)
-//                                     .where(new KeySelector<Long, Long>() { 
public Long getKey(Long value) { return value; }})
-//                                     .equalTo(new KeySelector<Long, Long>() 
{ public Long getKey(Long value) { return value; }})
-//                                     .with(new ForwardingBasicJoin<Long, 
Long>())
-//                             .print();
-//                     
-//                     Plan plan = env.createProgramPlan();
-//                     
-//                     GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
-//                     JoinOperatorBase<?, ?, ?, ?> join = 
(JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
-//                     
-//                     DualInputSemanticProperties semantics = 
join.getSemanticProperties();
-//                     
-//                     FieldSet fw11 = semantics.getForwardedField1(0);
-//                     FieldSet fw12 = semantics.getForwardedField1(1);
-//                     FieldSet fw21 = semantics.getForwardedField2(0);
-//                     FieldSet fw22 = semantics.getForwardedField2(1);
-//                     
-//                     assertNull(fw11);
-//                     assertNull(fw21);
-//                     
-//                     assertNotNull(fw12);
-//                     assertNotNull(fw22);
-//                     
-//                     assertTrue(fw12.contains(0));
-//                     assertTrue(fw22.contains(1));
-//             }
-//             catch (Exception e) {
-//                     System.err.println(e.getMessage());
-//                     e.printStackTrace();
-//                     fail("Exception in test: " + e.getMessage());
-//             }
-//     }
-       
+
+       @Test
+       public void testUnaryFunctionForwardedInLine2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input.map(new ReadSetMapper<Tuple3<Long, Long, 
Long>>()).withForwardedFields("0->1; 2")
+                               .print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) 
sink.getInput();
+
+               SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
+
+               FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+               FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
+               assertNotNull(fw1);
+               assertNotNull(fw2);
+               assertTrue(fw1.contains(1));
+               assertTrue(fw2.contains(2));
+       }
+
+       @Test
+       public void testUnaryFunctionForwardedInLine3() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input.map(new ReadSetMapper<Tuple3<Long, Long, 
Long>>()).withForwardedFields("0->1; 2")
+                               .print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) 
sink.getInput();
+
+               SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
+
+               FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+               FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
+               assertNotNull(fw1);
+               assertNotNull(fw2);
+               assertTrue(fw1.contains(1));
+               assertTrue(fw2.contains(2));
+       }
+
+       @Test
+       public void testUnaryFunctionAllForwardedExceptAnnotation() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, 
Long>>()).print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) 
sink.getInput();
+
+               SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
+
+               FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
+               FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
+               assertNotNull(fw1);
+               assertNotNull(fw2);
+               assertTrue(fw1.contains(0));
+               assertTrue(fw2.contains(2));
+       }
+
+       @Test
+       public void testUnaryFunctionReadFieldsAnnotation() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input.map(new ReadSetMapper<Tuple3<Long, Long, 
Long>>()).print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) 
sink.getInput();
+
+               SingleInputSemanticProperties semantics = 
mapper.getSemanticProperties();
+
+               FieldSet read = semantics.getReadFields(0);
+               assertNotNull(read);
+               assertEquals(2, read.size());
+               assertTrue(read.contains(0));
+               assertTrue(read.contains(2));
+       }
+
+       @Test(expected = 
SemanticProperties.InvalidSemanticAnnotationException.class)
+       public void testUnaryForwardedOverwritingInLine1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input.map(new WildcardForwardedMapper<Tuple3<Long, Long, 
Long>>()).withForwardedFields("0->1; 2");
+       }
+
+       @Test(expected = 
SemanticProperties.InvalidSemanticAnnotationException.class)
+       public void testUnaryForwardedOverwritingInLine2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, 
Long>>()).withForwardedFields("0->1; 2");
+       }
+
+       @Test
+       public void testBinaryForwardedAnnotation() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, String>> input1 = env.fromElements(new 
Tuple2<Long, String>(3l, "test"));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Double>> input2 = env.fromElements(new 
Tuple2<Long, Double>(3l, 3.1415));
+               input1.join(input2).where(0).equalTo(0).with(new 
ForwardedBothAnnotationJoin<Long, String, Long, Double>())
+                               .print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, 
?>) sink.getInput();
+
+               DualInputSemanticProperties semantics = 
join.getSemanticProperties();
+               assertNotNull(semantics.getForwardingTargetFields(0, 0));
+               assertNotNull(semantics.getForwardingTargetFields(1, 0));
+               assertEquals(1, semantics.getForwardingTargetFields(0, 
1).size());
+               assertEquals(1, semantics.getForwardingTargetFields(1, 
1).size());
+               assertTrue(semantics.getForwardingTargetFields(0, 
1).contains(0));
+               assertTrue(semantics.getForwardingTargetFields(1, 
1).contains(1));
+               assertEquals(0, semantics.getForwardingTargetFields(0, 
0).size());
+               assertEquals(0, semantics.getForwardingTargetFields(1, 
0).size());
+       }
+
+       @Test
+       public void testBinaryForwardedInLine1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new 
Tuple2<Long, Long>(3l, 4l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new 
Tuple2<Long, Long>(3l, 2l));
+               input1.join(input2).where(0).equalTo(0).with(new 
NoAnnotationJoin<Long>())
+                               .withForwardedFieldsFirst("0->1; 
1->2").withForwardedFieldsSecond("1->0")
+                               .print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, 
?>) sink.getInput();
+
+               DualInputSemanticProperties semantics = 
join.getSemanticProperties();
+               assertNotNull(semantics.getForwardingTargetFields(1, 0));
+               assertEquals(1, semantics.getForwardingTargetFields(0, 
0).size());
+               assertEquals(1, semantics.getForwardingTargetFields(0, 
1).size());
+               assertEquals(1, semantics.getForwardingTargetFields(1, 
1).size());
+               assertTrue(semantics.getForwardingTargetFields(0, 
0).contains(1));
+               assertTrue(semantics.getForwardingTargetFields(0, 
1).contains(2));
+               assertTrue(semantics.getForwardingTargetFields(1, 
1).contains(0));
+               assertEquals(0, semantics.getForwardingTargetFields(1, 
0).size());
+       }
+
+       @Test
+       public void testBinaryForwardedInLine2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new 
Tuple2<Long, Long>(3l, 4l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new 
Tuple2<Long, Long>(3l, 2l));
+               input1.join(input2).where(0).equalTo(0).with(new 
ReadSetJoin<Long>())
+                               .withForwardedFieldsFirst("0->1; 
1->2").withForwardedFieldsSecond("1->0")
+                               .print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, 
?>) sink.getInput();
+
+               DualInputSemanticProperties semantics = 
join.getSemanticProperties();
+               assertNotNull(semantics.getForwardingTargetFields(1, 0));
+               assertEquals(1, semantics.getForwardingTargetFields(0, 
0).size());
+               assertEquals(1, semantics.getForwardingTargetFields(0, 
1).size());
+               assertEquals(1, semantics.getForwardingTargetFields(1, 
1).size());
+               assertTrue(semantics.getForwardingTargetFields(0, 
0).contains(1));
+               assertTrue(semantics.getForwardingTargetFields(0, 
1).contains(2));
+               assertTrue(semantics.getForwardingTargetFields(1, 
1).contains(0));
+               assertNotNull(semantics.getReadFields(0));
+               assertNotNull(semantics.getReadFields(1));
+               assertEquals(1, semantics.getReadFields(0).size());
+               assertEquals(1, semantics.getReadFields(1).size());
+               assertTrue(semantics.getReadFields(0).contains(1));
+               assertTrue(semantics.getReadFields(1).contains(0));
+               assertEquals(0, semantics.getForwardingTargetFields(1, 
0).size());
+       }
+
+       @Test
+       public void testBinaryForwardedAnnotationInLineMixed1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new 
Tuple2<Long, Long>(3l, 4l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new 
Tuple2<Long, Long>(3l, 2l));
+               input1.join(input2).where(0).equalTo(0).with(new 
ForwardedFirstAnnotationJoin<Long>())
+                               .withForwardedFieldsSecond("1")
+                               .print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, 
?>) sink.getInput();
+
+               DualInputSemanticProperties semantics = 
join.getSemanticProperties();
+               assertNotNull(semantics.getForwardingTargetFields(0, 1));
+               assertNotNull(semantics.getForwardingTargetFields(1, 0));
+               assertNotNull(semantics.getForwardingTargetFields(0, 0));
+               assertNotNull(semantics.getForwardingTargetFields(1, 1));
+               assertEquals(1, semantics.getForwardingTargetFields(0, 
0).size());
+               assertEquals(1, semantics.getForwardingTargetFields(1, 
1).size());
+               assertTrue(semantics.getForwardingTargetFields(0, 
0).contains(2));
+               assertTrue(semantics.getForwardingTargetFields(1, 
1).contains(1));
+               assertEquals(0, semantics.getForwardingTargetFields(0, 
1).size());
+               assertEquals(0, semantics.getForwardingTargetFields(1, 
0).size());
+
+       }
+
+       @Test
+       public void testBinaryForwardedAnnotationInLineMixed2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new 
Tuple2<Long, Long>(3l, 4l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new 
Tuple2<Long, Long>(3l, 2l));
+               input1.join(input2).where(0).equalTo(0).with(new 
ForwardedSecondAnnotationJoin<Long>())
+                               .withForwardedFieldsFirst("0->1")
+                               .print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, 
?>) sink.getInput();
+
+               DualInputSemanticProperties semantics = 
join.getSemanticProperties();
+               assertNotNull(semantics.getForwardingTargetFields(0, 1));
+               assertNotNull(semantics.getForwardingTargetFields(1, 0));
+               assertNotNull(semantics.getForwardingTargetFields(0, 0));
+               assertNotNull(semantics.getForwardingTargetFields(1, 1));
+               assertEquals(1, semantics.getForwardingTargetFields(0, 
0).size());
+               assertEquals(1, semantics.getForwardingTargetFields(1, 
1).size());
+               assertTrue(semantics.getForwardingTargetFields(0, 
0).contains(1));
+               assertTrue(semantics.getForwardingTargetFields(1, 
1).contains(2));
+               assertEquals(0, semantics.getForwardingTargetFields(0, 
1).size());
+               assertEquals(0, semantics.getForwardingTargetFields(1, 
0).size());
+       }
+
+       @Test
+       public void testBinaryAllForwardedExceptAnnotation() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input1 = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 4l, 5l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input1.join(input2).where(0).equalTo(0).with(new 
AllForwardedExceptJoin<Long>())
+                               .print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, 
?>) sink.getInput();
+
+               DualInputSemanticProperties semantics = 
join.getSemanticProperties();
+               assertNotNull(semantics.getForwardingTargetFields(0, 0));
+               assertNotNull(semantics.getForwardingTargetFields(0, 2));
+               assertNotNull(semantics.getForwardingTargetFields(1, 0));
+               assertNotNull(semantics.getForwardingTargetFields(1, 1));
+               assertEquals(1, semantics.getForwardingTargetFields(0, 
1).size());
+               assertEquals(1, semantics.getForwardingTargetFields(1, 
2).size());
+               assertTrue(semantics.getForwardingTargetFields(0, 
1).contains(1));
+               assertTrue(semantics.getForwardingTargetFields(1, 
2).contains(2));
+               assertEquals(0, semantics.getForwardingTargetFields(0, 
0).size());
+               assertEquals(0, semantics.getForwardingTargetFields(0, 
2).size());
+               assertEquals(0, semantics.getForwardingTargetFields(1, 
0).size());
+               assertEquals(0, semantics.getForwardingTargetFields(1, 
1).size());
+       }
+
+       @Test
+       public void testBinaryReadFieldsAnnotation() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new 
Tuple2<Long, Long>(3l, 4l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new 
Tuple2<Long, Long>(3l, 2l));
+               input1.join(input2).where(0).equalTo(0).with(new 
ReadSetJoin<Long>())
+                               .print();
+               Plan plan = env.createProgramPlan();
+
+               GenericDataSinkBase<?> sink = 
plan.getDataSinks().iterator().next();
+               JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, 
?>) sink.getInput();
+
+               DualInputSemanticProperties semantics = 
join.getSemanticProperties();
+               assertNotNull(semantics.getReadFields(0));
+               assertNotNull(semantics.getReadFields(1));
+               assertEquals(1, semantics.getReadFields(0).size());
+               assertEquals(1, semantics.getReadFields(1).size());
+               assertTrue(semantics.getReadFields(0).contains(1));
+               assertTrue(semantics.getReadFields(1).contains(0));
+       }
+
+       @Test(expected = 
SemanticProperties.InvalidSemanticAnnotationException.class)
+       public void testBinaryForwardedOverwritingInLine1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new 
Tuple2<Long, Long>(3l, 4l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new 
Tuple2<Long, Long>(3l, 2l));
+               input1.join(input2).where(0).equalTo(0).with(new 
ForwardedFirstAnnotationJoin<Long>())
+                               .withForwardedFieldsFirst("0->1");
+       }
+
+       @Test(expected = 
SemanticProperties.InvalidSemanticAnnotationException.class)
+       public void testBinaryForwardedOverwritingInLine2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new 
Tuple2<Long, Long>(3l, 4l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new 
Tuple2<Long, Long>(3l, 2l));
+               input1.join(input2).where(0).equalTo(0).with(new 
ForwardedSecondAnnotationJoin<Long>())
+                               .withForwardedFieldsSecond("0->1");
+       }
+
+       @Test(expected = 
SemanticProperties.InvalidSemanticAnnotationException.class)
+       public void testBinaryForwardedOverwritingInLine3() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new 
Tuple2<Long, Long>(3l, 4l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new 
Tuple2<Long, Long>(3l, 2l));
+               input1.join(input2).where(0).equalTo(0).with(new 
ForwardedBothAnnotationJoin<Long, Long, Long, Long>())
+                               .withForwardedFieldsFirst("0->1;");
+       }
+
+       @Test(expected = 
SemanticProperties.InvalidSemanticAnnotationException.class)
+       public void testBinaryForwardedOverwritingInLine4() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new 
Tuple2<Long, Long>(3l, 4l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new 
Tuple2<Long, Long>(3l, 2l));
+               input1.join(input2).where(0).equalTo(0).with(new 
ForwardedBothAnnotationJoin<Long, Long, Long, Long>())
+                               .withForwardedFieldsSecond("0->1;");
+       }
+
+       @Test(expected = 
SemanticProperties.InvalidSemanticAnnotationException.class)
+       public void testBinaryForwardedOverwritingInLine5() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input1 = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 4l, 5l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input1.join(input2).where(0).equalTo(0).with(new 
AllForwardedExceptJoin<Long>())
+                               .withForwardedFieldsFirst("0->1;");
+       }
+
+       @Test(expected = 
SemanticProperties.InvalidSemanticAnnotationException.class)
+       public void testBinaryForwardedOverwritingInLine6() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input1 = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 4l, 5l));
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new 
Tuple3<Long, Long, Long>(3l, 2l, 1l));
+               input1.join(input2).where(0).equalTo(0).with(new 
AllForwardedExceptJoin<Long>())
+                               .withForwardedFieldsSecond("0->1;");
+       }
+
        // 
--------------------------------------------------------------------------------------------
+
+       public static class NoAnnotationMapper<T> implements MapFunction<T, T> {
+
+               @Override
+               public T map(T value)  {
+                       return value;
+               }
+       }
        
-       
-       @ConstantFields("*")
-       public static class WildcardConstantMapper<T> extends 
RichMapFunction<T, T> {
+       @ForwardedFields("*")
+       public static class WildcardForwardedMapper<T> implements 
MapFunction<T, T> {
 
                @Override
                public T map(T value)  {
@@ -291,41 +541,95 @@ public class SemanticPropertiesTranslationTest {
                }
        }
        
-       @ConstantFields("0->0;1->1;2->2")
-       public static class IndividualConstantMapper<X, Y, Z> extends 
RichMapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> {
+       @ForwardedFields("0;2")
+       public static class IndividualForwardedMapper<X, Y, Z> implements 
MapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> {
 
                @Override
                public Tuple3<X, Y, Z> map(Tuple3<X, Y, Z> value) {
                        return value;
                }
        }
-       
-       @ConstantFields("0")
-       public static class ZeroConstantMapper<T> extends RichMapFunction<T, T> 
{
+
+       @ForwardedFields("0->2;1->0;2->1")
+       public static class ShufflingMapper<X> implements MapFunction<Tuple3<X, 
X, X>, Tuple3<X, X, X>> {
+
+               @Override
+               public Tuple3<X, X, X> map(Tuple3<X, X, X> value) {
+                       return value;
+               }
+       }
+
+       @FunctionAnnotation.NonForwardedFields({"1"})
+       public static class AllForwardedExceptMapper<T> implements 
MapFunction<T, T> {
+
+               @Override
+               public T map(T value)  {
+                       return value;
+               }
+       }
+
+       @FunctionAnnotation.ReadFields({"0;2"})
+       public static class ReadSetMapper<T> implements MapFunction<T, T> {
 
                @Override
                public T map(T value)  {
                        return value;
                }
        }
-       
-       @ConstantFieldsFirst("1 -> 0")
-       @ConstantFieldsSecond("1 -> 1")
-       public static class ForwardingTupleJoin<A, B, C, D> extends 
RichJoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> {
+
+       public static class NoAnnotationJoin<X> implements 
JoinFunction<Tuple2<X,X>, Tuple2<X,X>, Tuple3<X,X,X>> {
+
+               @Override
+               public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> 
second) throws Exception {
+                       return null;
+               }
+       }
+
+       @ForwardedFieldsFirst("0->2")
+       public static class ForwardedFirstAnnotationJoin<X> implements 
JoinFunction<Tuple2<X,X>, Tuple2<X,X>, Tuple3<X,X,X>> {
+
+               @Override
+               public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> 
second) throws Exception {
+                       return null;
+               }
+       }
+
+       @ForwardedFieldsSecond("1->2")
+       public static class ForwardedSecondAnnotationJoin<X> implements 
JoinFunction<Tuple2<X,X>, Tuple2<X,X>, Tuple3<X,X,X>> {
+
+               @Override
+               public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> 
second) throws Exception {
+                       return null;
+               }
+       }
+
+       @ForwardedFieldsFirst("1 -> 0")
+       @ForwardedFieldsSecond("1 -> 1")
+       public static class ForwardedBothAnnotationJoin<A, B, C, D> implements 
JoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> {
 
                @Override
                public Tuple2<B, D> join(Tuple2<A, B> first, Tuple2<C, D> 
second) {
                        return new Tuple2<B, D>(first.f1, second.f1);
                }
        }
-       
-       @ConstantFieldsFirst("0 -> 0")
-       @ConstantFieldsSecond("0 -> 1")
-       public static class ForwardingBasicJoin<A, B> extends 
RichJoinFunction<A, B, Tuple2<A, B>> {
+
+       @FunctionAnnotation.NonForwardedFieldsFirst("0;2")
+       @FunctionAnnotation.NonForwardedFieldsSecond("0;1")
+       public static class AllForwardedExceptJoin<X> implements 
JoinFunction<Tuple3<X,X,X>, Tuple3<X,X,X>, Tuple3<X,X,X>> {
+
+               @Override
+               public Tuple3<X, X, X> join(Tuple3<X, X, X> first, Tuple3<X, X, 
X> second) throws Exception {
+                       return null;
+               }
+       }
+
+       @FunctionAnnotation.ReadFieldsFirst("1")
+       @FunctionAnnotation.ReadFieldsSecond("0")
+       public static class ReadSetJoin<X> implements JoinFunction<Tuple2<X,X>, 
Tuple2<X,X>, Tuple3<X,X,X>> {
 
                @Override
-               public Tuple2<A, B> join(A first, B second) {
-                       return new Tuple2<A, B>(first, second);
+               public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> 
second) throws Exception {
+                       return null;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
index c8e5ba3..67d0240 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
@@ -106,16 +106,14 @@ public class KeysTest {
                                new String[] {"f11"},
                                new String[] {"f-35"},
                                new String[] {"f0.f33"},
-                               new String[] {"f1.f33"},
-                               new String[] {"f1"} // select full tuple 
without saying "f1.*"
+                               new String[] {"f1.f33"}
                };
                for(int i = 0; i < tests.length; i++) {
                        Throwable e = null;
                        try {
                                new ExpressionKeys<Tuple3<String, 
Tuple3<String, String, String>, String>>(tests[i], typeInfo);
                        } catch(Throwable t) {
-                               // System.err.println("Message: 
"+t.getMessage()); t.printStackTrace();
-                               e = t;  
+                               e = t;
                        }
                        Assert.assertNotNull(e);
                }
@@ -127,16 +125,14 @@ public class KeysTest {
                
                String[][] tests = new String[][] {
                                new String[] {"nonexistent"},
-                               new String[] {"date.abc"}, // nesting into 
unnested
-                               new String[] {"word"} // select full tuple 
without saying "f1.*"
+                               new String[] {"date.abc"} // nesting into 
unnested
                };
                for(int i = 0; i < tests.length; i++) {
                        Throwable e = null;
                        try {
                                new 
ExpressionKeys<ComplexNestedClass>(tests[i], ti);
                        } catch(Throwable t) {
-                               // System.err.println("Message: 
"+t.getMessage()); t.printStackTrace();
-                               e = t;  
+                               e = t;
                        }
                        Assert.assertNotNull(e);
                }
@@ -214,6 +210,9 @@ public class KeysTest {
                
                complexFpk = new ExpressionKeys<Tuple3<String, 
Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] 
{"f1.f0.*"}, complexTypeInfo);
                Assert.assertArrayEquals(new int[] {1,2,3}, 
complexFpk.computeLogicalKeyPositions());
+
+               complexFpk = new ExpressionKeys<Tuple3<String, 
Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] 
{"f1.f0"}, complexTypeInfo);
+               Assert.assertArrayEquals(new int[] {1,2,3}, 
complexFpk.computeLogicalKeyPositions());
                
                complexFpk = new ExpressionKeys<Tuple3<String, 
Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] 
{"f2"}, complexTypeInfo);
                Assert.assertArrayEquals(new int[] {6}, 
complexFpk.computeLogicalKeyPositions());
@@ -246,6 +245,12 @@ public class KeysTest {
                ek = new ExpressionKeys<PojoWithMultiplePojos>(new 
String[]{"p2.*"}, ti);
                Assert.assertArrayEquals(new int[] {3,4}, 
ek.computeLogicalKeyPositions());
 
+               ek = new ExpressionKeys<PojoWithMultiplePojos>(new 
String[]{"p1"}, ti);
+               Assert.assertArrayEquals(new int[] {1,2}, 
ek.computeLogicalKeyPositions());
+
+               ek = new ExpressionKeys<PojoWithMultiplePojos>(new 
String[]{"p2"}, ti);
+               Assert.assertArrayEquals(new int[] {3,4}, 
ek.computeLogicalKeyPositions());
+
                ek = new ExpressionKeys<PojoWithMultiplePojos>(new 
String[]{"i0"}, ti);
                Assert.assertArrayEquals(new int[] {0}, 
ek.computeLogicalKeyPositions());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
index 9834a25..3f7e7c5 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
@@ -149,23 +149,8 @@ public class CoGroupWrappingFunctionTest {
                                CoGroupOperator coGroupOp = 
CoGroupOperator.builder(new TestCoGroupFunction(), LongValue.class, 1, 
2).build();
                                
                                DualInputSemanticProperties props = 
coGroupOp.getSemanticProperties();
-                               FieldSet fw2 = props.getForwardedField1(2);
-                               FieldSet fw4 = props.getForwardedField2(4);
-                               
-                               assertNotNull(fw2);
-                               assertNotNull(fw4);
-                               assertEquals(1, fw2.size());
-                               assertEquals(1, fw4.size());
-                               assertTrue(fw2.contains(2));
-                               assertTrue(fw4.contains(4));
-                       }
-                       {
-                               CoGroupOperator coGroupOp = 
CoGroupOperator.builder(TestCoGroupFunction.class, LongValue.class, 1, 
2).build();
-                               
-                               DualInputSemanticProperties props = 
coGroupOp.getSemanticProperties();
-                               FieldSet fw2 = props.getForwardedField1(2);
-                               FieldSet fw4 = props.getForwardedField2(4);
-                               
+                               FieldSet fw2 = 
props.getForwardingTargetFields(0, 2);
+                               FieldSet fw4 = 
props.getForwardingTargetFields(1, 4);
                                assertNotNull(fw2);
                                assertNotNull(fw4);
                                assertEquals(1, fw2.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
index 9e262fc..f413b81 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
@@ -159,23 +159,8 @@ public class ReduceWrappingFunctionTest {
                                ReduceOperator reduceOp = 
ReduceOperator.builder(new TestReduceFunction()).build();
                                
                                SingleInputSemanticProperties props = 
reduceOp.getSemanticProperties();
-                               FieldSet fw2 = props.getForwardedField(2);
-                               FieldSet fw4 = props.getForwardedField(4);
-                               
-                               assertNotNull(fw2);
-                               assertNotNull(fw4);
-                               assertEquals(1, fw2.size());
-                               assertEquals(1, fw4.size());
-                               assertTrue(fw2.contains(2));
-                               assertTrue(fw4.contains(4));
-                       }
-                       {
-                               ReduceOperator reduceOp = 
ReduceOperator.builder(TestReduceFunction.class).build();
-                               
-                               SingleInputSemanticProperties props = 
reduceOp.getSemanticProperties();
-                               FieldSet fw2 = props.getForwardedField(2);
-                               FieldSet fw4 = props.getForwardedField(4);
-                               
+                               FieldSet fw2 = 
props.getForwardingTargetFields(0, 2);
+                               FieldSet fw4 = 
props.getForwardingTargetFields(0, 4);
                                assertNotNull(fw2);
                                assertNotNull(fw4);
                                assertEquals(1, fw2.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 27db31d..1f3f71c 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -82,7 +82,7 @@ public class PojoTypeExtractionTest {
                public static int ignoreStaticField;
                public transient int ignoreTransientField;
                public Date date; // generic type
-               public Integer someNumber; // BasicType
+               public Integer someNumberWithÜnicödeNäme; // BasicType
                public float someFloat; // BasicType
                public Tuple3<Long, Long, String> word; //Tuple Type with three 
basic types
                public Object nothing; // generic type
@@ -226,7 +226,7 @@ public class PojoTypeExtractionTest {
                                "complex.collection",
                                "complex.nothing",
                                "complex.someFloat",
-                               "complex.someNumber",
+                               "complex.someNumberWithÜnicödeNäme",
                                "complex.word.f0",
                                "complex.word.f1",
                                "complex.word.f2"};
@@ -242,13 +242,13 @@ public class PojoTypeExtractionTest {
                                8};
                Assert.assertEquals(fields.length, positions.length);
                for(int i = 0; i < fields.length; i++) {
-                       pojoType.getKey(fields[i], 0, ffd);
+                       pojoType.getFlatFields(fields[i], 0, ffd);
                        Assert.assertEquals("Too many keys returned", 1, 
ffd.size());
                        Assert.assertEquals("position of field "+fields[i]+" 
wrong", positions[i], ffd.get(0).getPosition());
                        ffd.clear();
                }
 
-               pojoType.getKey("complex.word.*", 0, ffd);
+               pojoType.getFlatFields("complex.word.*", 0, ffd);
                Assert.assertEquals(3, ffd.size());
                // check if it returns 5,6,7
                for(FlatFieldDescriptor ffdE : ffd) {
@@ -268,11 +268,11 @@ public class PojoTypeExtractionTest {
                ffd.clear();
 
                // scala style full tuple selection for pojos
-               pojoType.getKey("complex.word._", 0, ffd);
+               pojoType.getFlatFields("complex.word._", 0, ffd);
                Assert.assertEquals(3, ffd.size());
                ffd.clear();
 
-               pojoType.getKey("complex.*", 0, ffd);
+               pojoType.getFlatFields("complex.*", 0, ffd);
                Assert.assertEquals(9, ffd.size());
                // check if it returns 0-7
                for(FlatFieldDescriptor ffdE : ffd) {
@@ -313,7 +313,7 @@ public class PojoTypeExtractionTest {
                }
                ffd.clear();
 
-               pojoType.getKey("*", 0, ffd);
+               pojoType.getFlatFields("*", 0, ffd);
                Assert.assertEquals(10, ffd.size());
                // check if it returns 0-8
                for(FlatFieldDescriptor ffdE : ffd) {
@@ -344,7 +344,7 @@ public class PojoTypeExtractionTest {
                                dateSeen = true;
                                
Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.type);
                                Assert.assertEquals(Date.class, 
field.type.getTypeClass());
-                       } else if(name.equals("someNumber")) {
+                       } else if(name.equals("someNumberWithÜnicödeNäme")) {
                                if(intSeen) {
                                        Assert.fail("already seen");
                                }
@@ -450,7 +450,7 @@ public class PojoTypeExtractionTest {
                                strArraySeen = true;
                                
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
                                Assert.assertEquals(String[].class, 
field.type.getTypeClass());
-                       } else if(Arrays.asList("date", "someNumber", 
"someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) {
+                       } else if(Arrays.asList("date", 
"someNumberWithÜnicödeNäme", "someFloat", "word", "nothing", 
"hadoopCitizen", "collection").contains(name)) {
                                // ignore these, they are inherited from the 
ComplexNestedClass
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index 8a2d675..bc36241 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -148,7 +148,7 @@ public class TypeExtractorTest {
                Assert.assertEquals(9, ti.getArity());
                Assert.assertTrue(ti instanceof TupleTypeInfo);
                List<FlatFieldDescriptor> ffd = new 
ArrayList<FlatFieldDescriptor>();
-               ((TupleTypeInfo) ti).getKey("f3", 0, ffd);
+               ((TupleTypeInfo) ti).getFlatFields("f3", 0, ffd);
                Assert.assertTrue(ffd.size() == 1);
                Assert.assertEquals(3, ffd.get(0).getPosition() );
 
@@ -215,16 +215,16 @@ public class TypeExtractorTest {
                Assert.assertTrue(ti instanceof TupleTypeInfo);
                List<FlatFieldDescriptor> ffd = new 
ArrayList<FlatFieldDescriptor>();
                
-               ((TupleTypeInfo) ti).getKey("f0.f0", 0, ffd);
+               ((TupleTypeInfo) ti).getFlatFields("f0.f0", 0, ffd);
                Assert.assertEquals(0, ffd.get(0).getPosition() );
                ffd.clear();
                
-               ((TupleTypeInfo) ti).getKey("f0.f0", 0, ffd);
+               ((TupleTypeInfo) ti).getFlatFields("f0.f0", 0, ffd);
                Assert.assertTrue( ffd.get(0).getType() instanceof 
BasicTypeInfo );
                Assert.assertTrue( 
ffd.get(0).getType().getTypeClass().equals(String.class) );
                ffd.clear();
                
-               ((TupleTypeInfo) ti).getKey("f1.f0", 0, ffd);
+               ((TupleTypeInfo) ti).getFlatFields("f1.f0", 0, ffd);
                Assert.assertEquals(1, ffd.get(0).getPosition() );
                ffd.clear();
 
@@ -384,19 +384,19 @@ public class TypeExtractorTest {
                Assert.assertEquals(Tuple2.class, tti.getTypeClass());
                List<FlatFieldDescriptor> ffd = new 
ArrayList<FlatFieldDescriptor>();
                
-               tti.getKey("f0", 0, ffd);
+               tti.getFlatFields("f0", 0, ffd);
                Assert.assertEquals(1, ffd.size());
                Assert.assertEquals(0, ffd.get(0).getPosition() ); // Long
                Assert.assertTrue( 
ffd.get(0).getType().getTypeClass().equals(Long.class) );
                ffd.clear();
                
-               tti.getKey("f1.myField1", 0, ffd);
+               tti.getFlatFields("f1.myField1", 0, ffd);
                Assert.assertEquals(1, ffd.get(0).getPosition() );
                Assert.assertTrue( 
ffd.get(0).getType().getTypeClass().equals(String.class) );
                ffd.clear();
                
                
-               tti.getKey("f1.myField2", 0, ffd);
+               tti.getFlatFields("f1.myField2", 0, ffd);
                Assert.assertEquals(2, ffd.get(0).getPosition() );
                Assert.assertTrue( 
ffd.get(0).getType().getTypeClass().equals(Integer.class) );
                

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
new file mode 100644
index 0000000..c71625a
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.junit.Test;
+
+public class CompositeTypeTest {
+
+       private final TupleTypeInfo<?> tupleTypeInfo = new 
TupleTypeInfo<Tuple4<Integer, Integer, Integer, Integer>>(
+                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO);
+
+       private final TupleTypeInfo<Tuple3<Integer, String, Long>> 
inNestedTuple1 = new TupleTypeInfo<Tuple3<Integer, String, Long>>(
+                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+
+       private final TupleTypeInfo<Tuple2<Double, Double>> inNestedTuple2 = 
new TupleTypeInfo<Tuple2<Double, Double>>(
+                       BasicTypeInfo.DOUBLE_TYPE_INFO, 
BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+       private final TupleTypeInfo<?> nestedTypeInfo = new 
TupleTypeInfo<Tuple4<Integer, Tuple3<Integer, String, Long>, Integer, 
Tuple2<Double, Double>>>(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       inNestedTuple1,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       inNestedTuple2);
+
+       private final TupleTypeInfo<Tuple2<Integer, Tuple2<Integer, Integer>>> 
inNestedTuple3 = new TupleTypeInfo<Tuple2<Integer, Tuple2<Integer, Integer>>>(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       new TupleTypeInfo<Tuple2<Integer, 
Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
+
+       private final TupleTypeInfo<?> deepNestedTupleTypeInfo = new 
TupleTypeInfo<Tuple3<Integer, Tuple2<Integer, Tuple2<Integer, Integer>>, 
Integer>>(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       inNestedTuple3,
+                       BasicTypeInfo.INT_TYPE_INFO     );
+
+       private final PojoTypeInfo<?> pojoTypeInfo = 
((PojoTypeInfo<?>)TypeExtractor.getForClass(MyPojo.class));
+
+       private final TupleTypeInfo<?> pojoInTupleTypeInfo = new 
TupleTypeInfo<Tuple2<Integer, MyPojo>>(BasicTypeInfo.INT_TYPE_INFO, 
pojoTypeInfo);
+
+       @Test
+       public void testGetFlatFields() {
+
+               assertEquals(0, 
tupleTypeInfo.getFlatFields("0").get(0).getPosition());
+               assertEquals(1, 
tupleTypeInfo.getFlatFields("1").get(0).getPosition());
+               assertEquals(2, 
tupleTypeInfo.getFlatFields("2").get(0).getPosition());
+               assertEquals(3, 
tupleTypeInfo.getFlatFields("3").get(0).getPosition());
+               assertEquals(0, 
tupleTypeInfo.getFlatFields("f0").get(0).getPosition());
+               assertEquals(1, 
tupleTypeInfo.getFlatFields("f1").get(0).getPosition());
+               assertEquals(2, 
tupleTypeInfo.getFlatFields("f2").get(0).getPosition());
+               assertEquals(3, 
tupleTypeInfo.getFlatFields("f3").get(0).getPosition());
+
+               assertEquals(0, 
nestedTypeInfo.getFlatFields("0").get(0).getPosition());
+               assertEquals(1, 
nestedTypeInfo.getFlatFields("1.0").get(0).getPosition());
+               assertEquals(2, 
nestedTypeInfo.getFlatFields("1.1").get(0).getPosition());
+               assertEquals(3, 
nestedTypeInfo.getFlatFields("1.2").get(0).getPosition());
+               assertEquals(4, 
nestedTypeInfo.getFlatFields("2").get(0).getPosition());
+               assertEquals(5, 
nestedTypeInfo.getFlatFields("3.0").get(0).getPosition());
+               assertEquals(6, 
nestedTypeInfo.getFlatFields("3.1").get(0).getPosition());
+               assertEquals(4, 
nestedTypeInfo.getFlatFields("f2").get(0).getPosition());
+               assertEquals(5, 
nestedTypeInfo.getFlatFields("f3.f0").get(0).getPosition());
+               assertEquals(3, nestedTypeInfo.getFlatFields("1").size());
+               assertEquals(1, 
nestedTypeInfo.getFlatFields("1").get(0).getPosition());
+               assertEquals(2, 
nestedTypeInfo.getFlatFields("1").get(1).getPosition());
+               assertEquals(3, 
nestedTypeInfo.getFlatFields("1").get(2).getPosition());
+               assertEquals(3, nestedTypeInfo.getFlatFields("1.*").size());
+               assertEquals(1, 
nestedTypeInfo.getFlatFields("1.*").get(0).getPosition());
+               assertEquals(2, 
nestedTypeInfo.getFlatFields("1.*").get(1).getPosition());
+               assertEquals(3, 
nestedTypeInfo.getFlatFields("1.*").get(2).getPosition());
+               assertEquals(2, nestedTypeInfo.getFlatFields("3").size());
+               assertEquals(5, 
nestedTypeInfo.getFlatFields("3").get(0).getPosition());
+               assertEquals(6, 
nestedTypeInfo.getFlatFields("3").get(1).getPosition());
+               assertEquals(3, nestedTypeInfo.getFlatFields("f1").size());
+               assertEquals(1, 
nestedTypeInfo.getFlatFields("f1").get(0).getPosition());
+               assertEquals(2, 
nestedTypeInfo.getFlatFields("f1").get(1).getPosition());
+               assertEquals(3, 
nestedTypeInfo.getFlatFields("f1").get(2).getPosition());
+               assertEquals(2, nestedTypeInfo.getFlatFields("f3").size());
+               assertEquals(5, 
nestedTypeInfo.getFlatFields("f3").get(0).getPosition());
+               assertEquals(6, 
nestedTypeInfo.getFlatFields("f3").get(1).getPosition());
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO,
+                               
nestedTypeInfo.getFlatFields("0").get(0).getType());
+               assertEquals(BasicTypeInfo.STRING_TYPE_INFO,
+                               
nestedTypeInfo.getFlatFields("1.1").get(0).getType());
+               assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
+                               
nestedTypeInfo.getFlatFields("1").get(2).getType());
+               assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO,
+                               
nestedTypeInfo.getFlatFields("3").get(1).getType());
+
+               assertEquals(3, 
deepNestedTupleTypeInfo.getFlatFields("1").size());
+               assertEquals(1, 
deepNestedTupleTypeInfo.getFlatFields("1").get(0).getPosition());
+               assertEquals(2, 
deepNestedTupleTypeInfo.getFlatFields("1").get(1).getPosition());
+               assertEquals(3, 
deepNestedTupleTypeInfo.getFlatFields("1").get(2).getPosition());
+               assertEquals(5, 
deepNestedTupleTypeInfo.getFlatFields("*").size());
+               assertEquals(0, 
deepNestedTupleTypeInfo.getFlatFields("*").get(0).getPosition());
+               assertEquals(1, 
deepNestedTupleTypeInfo.getFlatFields("*").get(1).getPosition());
+               assertEquals(2, 
deepNestedTupleTypeInfo.getFlatFields("*").get(2).getPosition());
+               assertEquals(3, 
deepNestedTupleTypeInfo.getFlatFields("*").get(3).getPosition());
+               assertEquals(4, 
deepNestedTupleTypeInfo.getFlatFields("*").get(4).getPosition());
+
+               assertEquals(0, 
pojoTypeInfo.getFlatFields("a").get(0).getPosition());
+               assertEquals(1, 
pojoTypeInfo.getFlatFields("b").get(0).getPosition());
+               assertEquals(2, pojoTypeInfo.getFlatFields("*").size());
+               assertEquals(0, 
pojoTypeInfo.getFlatFields("*").get(0).getPosition());
+               assertEquals(1, 
pojoTypeInfo.getFlatFields("*").get(1).getPosition());
+
+               assertEquals(1, 
pojoInTupleTypeInfo.getFlatFields("f1.a").get(0).getPosition());
+               assertEquals(2, 
pojoInTupleTypeInfo.getFlatFields("1.b").get(0).getPosition());
+               assertEquals(2, pojoInTupleTypeInfo.getFlatFields("1").size());
+               assertEquals(1, 
pojoInTupleTypeInfo.getFlatFields("1.*").get(0).getPosition());
+               assertEquals(2, 
pojoInTupleTypeInfo.getFlatFields("1").get(1).getPosition());
+               assertEquals(2, 
pojoInTupleTypeInfo.getFlatFields("f1.*").size());
+               assertEquals(1, 
pojoInTupleTypeInfo.getFlatFields("f1.*").get(0).getPosition());
+               assertEquals(2, 
pojoInTupleTypeInfo.getFlatFields("f1").get(1).getPosition());
+               assertEquals(3, pojoInTupleTypeInfo.getFlatFields("*").size());
+               assertEquals(0, 
pojoInTupleTypeInfo.getFlatFields("*").get(0).getPosition());
+               assertEquals(1, 
pojoInTupleTypeInfo.getFlatFields("*").get(1).getPosition());
+               assertEquals(2, 
pojoInTupleTypeInfo.getFlatFields("*").get(2).getPosition());
+
+       }
+
+       @Test
+       public void testFieldAtStringRef() {
+
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("0"));
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("2"));
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("f1"));
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("f3"));
+
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("0"));
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("1.0"));
+               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
nestedTypeInfo.getTypeAt("1.1"));
+               assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
nestedTypeInfo.getTypeAt("1.2"));
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("2"));
+               assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
nestedTypeInfo.getTypeAt("3.0"));
+               assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
nestedTypeInfo.getTypeAt("3.1"));
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("f2"));
+               assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
nestedTypeInfo.getTypeAt("f3.f0"));
+               assertEquals(inNestedTuple1, nestedTypeInfo.getTypeAt("1"));
+               assertEquals(inNestedTuple2, nestedTypeInfo.getTypeAt("3"));
+               assertEquals(inNestedTuple1, nestedTypeInfo.getTypeAt("f1"));
+               assertEquals(inNestedTuple2, nestedTypeInfo.getTypeAt("f3"));
+
+               assertEquals(inNestedTuple3, 
deepNestedTupleTypeInfo.getTypeAt("1"));
+
+               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
pojoTypeInfo.getTypeAt("a"));
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
pojoTypeInfo.getTypeAt("b"));
+
+               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
pojoInTupleTypeInfo.getTypeAt("f1.a"));
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
pojoInTupleTypeInfo.getTypeAt("1.b"));
+               assertEquals(pojoTypeInfo, pojoInTupleTypeInfo.getTypeAt("1"));
+               assertEquals(pojoTypeInfo, pojoInTupleTypeInfo.getTypeAt("f1"));
+
+       }
+
+       public static class MyPojo {
+               public String a;
+               public int b;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 5d0917e..ae47fd3 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -187,7 +187,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                // test with a simple, string-key first.
                PojoTypeInfo<TestUserClass> pType = 
(PojoTypeInfo<TestUserClass>) type;
                List<FlatFieldDescriptor> result = new 
ArrayList<FlatFieldDescriptor>();
-               pType.getKey("nestedClass.dumm2", 0, result);
+               pType.getFlatFields("nestedClass.dumm2", 0, result);
                int[] fields = new int[1]; // see below
                fields[0] = result.get(0).getPosition();
                TypeComparator<TestUserClass> pojoComp = 
pType.createComparator( fields, new boolean[]{true}, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 94c5461..c1796ee 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -219,31 +219,31 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     this
   }
 
-  def withConstantSet(constantSets: String*) = {
+  def withForwardedFields(forwardedFields: String*) = {
     javaSet match {
-      case op: SingleInputUdfOperator[_, _, _] => 
op.withConstantSet(constantSets: _*)
+      case op: SingleInputUdfOperator[_, _, _] => 
op.withForwardedFields(forwardedFields: _*)
       case _ =>
-        throw new UnsupportedOperationException("Cannot specify constant sets 
on Operator " +
+        throw new UnsupportedOperationException("Cannot specify forwarded 
fields for Operator " +
           javaSet.toString + ".")
     }
     this
   }
 
-  def withConstantSetFirst(constantSets: String*) = {
+  def withForwardedFieldsFirst(forwardedFields: String*) = {
     javaSet match {
-      case op: TwoInputUdfOperator[_, _, _, _] => 
op.withConstantSetFirst(constantSets: _*)
+      case op: TwoInputUdfOperator[_, _, _, _] => 
op.withForwardedFieldsFirst(forwardedFields: _*)
       case _ =>
-        throw new UnsupportedOperationException("Cannot specify constant sets 
on Operator " +
+        throw new UnsupportedOperationException("Cannot specify forwarded 
fields for Operator " +
           javaSet.toString + ".")
     }
     this
   }
 
-  def withConstantSetSecond(constantSets: String*) = {
+  def withForwardedFieldsSecond(forwardedFields: String*) = {
     javaSet match {
-      case op: TwoInputUdfOperator[_, _, _, _] => 
op.withConstantSetSecond(constantSets: _*)
+      case op: TwoInputUdfOperator[_, _, _, _] => 
op.withForwardedFieldsSecond(forwardedFields: _*)
       case _ =>
-        throw new UnsupportedOperationException("Cannot specify constant sets 
on Operator " +
+        throw new UnsupportedOperationException("Cannot specify forwarded 
fields for Operator " +
           javaSet.toString + ".")
     }
     this

Reply via email to