http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 7c80bf8..362cf9b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -20,395 +20,558 @@
 package org.apache.flink.api.java.functions;
 
 import java.lang.annotation.Annotation;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import 
org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsExcept;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecondExcept;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import 
org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsSecond;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
 public class SemanticPropUtil {
 
-       private final static String REGEX_LIST = 
"(\\s*(\\d+\\s*,\\s*)*(\\d+\\s*))";
-       private final static String REGEX_FORWARD = "(\\s*(\\d+)\\s*->(" + 
REGEX_LIST + "|(\\*)))";
-       private final static String REGEX_LIST_OR_FORWARD = "(" + REGEX_LIST + 
"|" + REGEX_FORWARD + ")";
-       private final static String REGEX_ANNOTATION = "(\\s*(" + 
REGEX_LIST_OR_FORWARD + "\\s*;\\s*)*(" + REGEX_LIST_OR_FORWARD + "\\s*))";
+       private final static String REGEX_WILDCARD = "[\\"+ 
Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ 
Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]";
+       private final static String REGEX_SINGLE_FIELD = 
"[\\p{L}\\p{Digit}_\\$]+";
+       private final static String REGEX_NESTED_FIELDS = "((" + 
REGEX_SINGLE_FIELD + "\\.)*" + REGEX_SINGLE_FIELD + ")(\\."+ REGEX_WILDCARD 
+")?";
 
+       private final static String REGEX_LIST = "((" + REGEX_NESTED_FIELDS + 
";)*(" + REGEX_NESTED_FIELDS + ");?)";
+       private final static String REGEX_FORWARD = "(("+ REGEX_NESTED_FIELDS 
+"|"+ REGEX_WILDCARD +")->(" + REGEX_NESTED_FIELDS + "|"+ REGEX_WILDCARD +"))";
+       private final static String REGEX_FIELD_OR_FORWARD = "(" + 
REGEX_NESTED_FIELDS + "|" + REGEX_FORWARD + ")";
+       private final static String REGEX_ANNOTATION = "((" + 
REGEX_FIELD_OR_FORWARD + ";)*(" + REGEX_FIELD_OR_FORWARD + ");?)";
+
+       private static final Pattern PATTERN_WILDCARD = 
Pattern.compile(REGEX_WILDCARD);
        private static final Pattern PATTERN_FORWARD = 
Pattern.compile(REGEX_FORWARD);
        private static final Pattern PATTERN_ANNOTATION = 
Pattern.compile(REGEX_ANNOTATION);
        private static final Pattern PATTERN_LIST = Pattern.compile(REGEX_LIST);
+       private static final Pattern PATTERN_FIELD = 
Pattern.compile(REGEX_NESTED_FIELDS);
+
+       public static SingleInputSemanticProperties 
createProjectionPropertiesSingle(int[] fields, CompositeType<?> inType)
+       {
 
-       private static final Pattern PATTERN_DIGIT = Pattern.compile("\\d+");
+               Character.isJavaIdentifierStart(1);
 
-       public static SingleInputSemanticProperties 
createProjectionPropertiesSingle(int[] fields) {
                SingleInputSemanticProperties ssp = new 
SingleInputSemanticProperties();
-               for (int i = 0; i < fields.length; i++) {
-                       ssp.addForwardedField(fields[i], i);
+
+               int[] sourceOffsets = new int[inType.getArity()];
+               sourceOffsets[0] = 0;
+               for(int i=1; i<inType.getArity(); i++) {
+                       sourceOffsets[i] = 
inType.getTypeAt(i-1).getTotalFields() + sourceOffsets[i-1];
+               }
+
+               int targetOffset = 0;
+               for(int i=0; i<fields.length; i++) {
+                       int sourceOffset = sourceOffsets[fields[i]];
+                       int numFieldsToCopy = 
inType.getTypeAt(fields[i]).getTotalFields();
+
+                       for(int j=0; j<numFieldsToCopy; j++) {
+                               ssp.addForwardedField(sourceOffset+j, 
targetOffset+j);
+                       }
+                       targetOffset += numFieldsToCopy;
                }
+
                return ssp;
        }
 
-       public static DualInputSemanticProperties 
createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst) {
+       public static DualInputSemanticProperties 
createProjectionPropertiesDual(
+                       int[] fields, boolean[] isFromFirst, TypeInformation<?> 
inType1, TypeInformation<?> inType2)
+       {
                DualInputSemanticProperties dsp = new 
DualInputSemanticProperties();
 
-               for (int i = 0; i < fields.length; i++) {
-                       if (isFromFirst[i]) {
-                               dsp.addForwardedField1(fields[i], i);
+               int[] sourceOffsets1;
+               if(inType1 instanceof TupleTypeInfo<?>) {
+                       sourceOffsets1 = new int[inType1.getArity()];
+                       sourceOffsets1[0] = 0;
+                       for(int i=1; i<inType1.getArity(); i++) {
+                               sourceOffsets1[i] = 
((TupleTypeInfo<?>)inType1).getTypeAt(i-1).getTotalFields() + 
sourceOffsets1[i-1];
+                       }
+               } else {
+                       sourceOffsets1 = new int[] {0};
+               }
+
+               int[] sourceOffsets2;
+               if(inType2 instanceof TupleTypeInfo<?>) {
+                       sourceOffsets2 = new int[inType2.getArity()];
+                       sourceOffsets2[0] = 0;
+                       for(int i=1; i<inType2.getArity(); i++) {
+                               sourceOffsets2[i] = 
((TupleTypeInfo<?>)inType2).getTypeAt(i-1).getTotalFields() + 
sourceOffsets2[i-1];
+                       }
+               } else {
+                       sourceOffsets2 = new int[] {0};
+               }
+
+               int targetOffset = 0;
+               for(int i=0; i<fields.length; i++) {
+                       int sourceOffset;
+                       int numFieldsToCopy;
+                       int input;
+                       if(isFromFirst[i]) {
+                               input = 0;
+                               if (fields[i] == -1) {
+                                       sourceOffset = 0;
+                                       numFieldsToCopy = 
inType1.getTotalFields();
+                               } else {
+                                       sourceOffset = 
sourceOffsets1[fields[i]];
+                                       numFieldsToCopy = 
((TupleTypeInfo<?>)inType1).getTypeAt(fields[i]).getTotalFields();
+                               }
                        } else {
-                               dsp.addForwardedField2(fields[i], i);
+                               input = 1;
+                               if (fields[i] == -1) {
+                                       sourceOffset = 0;
+                                       numFieldsToCopy = 
inType2.getTotalFields();
+                               } else {
+                                       sourceOffset = 
sourceOffsets2[fields[i]];
+                                       numFieldsToCopy = 
((TupleTypeInfo<?>)inType2).getTypeAt(fields[i]).getTotalFields();
+                               }
+                       }
+
+                       for(int j=0; j<numFieldsToCopy; j++) {
+                               dsp.addForwardedField(input, sourceOffset+j, 
targetOffset+j);
                        }
+                       targetOffset += numFieldsToCopy;
                }
+
                return dsp;
        }
 
-       public static SingleInputSemanticProperties 
getSemanticPropsSingle(Set<Annotation> set, TypeInformation<?> inType, 
TypeInformation<?> outType) {
+       public static SingleInputSemanticProperties getSemanticPropsSingle(
+                       Set<Annotation> set, TypeInformation<?> inType, 
TypeInformation<?> outType) {
                if (set == null) {
-                       return null;
+                       return new SingleInputSemanticProperties();
                }
                Iterator<Annotation> it = set.iterator();
-               SingleInputSemanticProperties result = null;
 
-               //non tuple types are not yet supported for annotations
-               if (!inType.isTupleType() || !outType.isTupleType()) {
-                       return null;
-               }
+               String[] forwarded = null;
+               String[] nonForwarded = null;
+               String[] read = null;
 
                while (it.hasNext()) {
-                       if (result == null) {
-                               result = new SingleInputSemanticProperties();
-                       }
 
                        Annotation ann = it.next();
 
-                       if (ann instanceof ConstantFields) {
-                               ConstantFields cf = (ConstantFields) ann;
-                               parseConstantFields(cf.value(), result, inType, 
outType);
-                       } else if (ann instanceof ConstantFieldsExcept) {
-                               ConstantFieldsExcept cfe = 
(ConstantFieldsExcept) ann;
-                               parseConstantFieldsExcept(cfe.value(), result, 
inType, outType);
+                       if (ann instanceof ForwardedFields) {
+                               forwarded = ((ForwardedFields) ann).value();
+                       } else if (ann instanceof NonForwardedFields) {
+                               nonForwarded = ((NonForwardedFields) 
ann).value();
                        } else if (ann instanceof ReadFields) {
-                               ReadFields rf = (ReadFields) ann;
-                               parseReadFields(rf.value(), result, inType, 
outType);
+                               read = ((ReadFields) ann).value();
+                       } else if (ann instanceof ForwardedFieldsFirst || ann 
instanceof ForwardedFieldsSecond ||
+                                       ann instanceof NonForwardedFieldsFirst 
|| ann instanceof NonForwardedFieldsSecond ||
+                                       ann instanceof ReadFieldsFirst || ann 
instanceof ReadFieldsSecond) {
+                               throw new 
InvalidSemanticAnnotationException("Annotation "+ann.getClass()+" invalid for 
single input function.");
                        }
                }
-               return result;
-       }
 
-       private static void parseConstantFields(String[] cf, 
SingleInputSemanticProperties sm, TypeInformation<?> inType, TypeInformation<?> 
outType) {
-               if (cf == null) {
-                       return;
-               }
-               for (String s : cf) {
-                       if (s != null) {
-                               readConstantSet(sm, s, inType, outType, 0);
-                       }
+               if(forwarded != null || nonForwarded != null || read != null) {
+                       SingleInputSemanticProperties result = new 
SingleInputSemanticProperties();
+                       getSemanticPropsSingleFromString(result, forwarded, 
nonForwarded, read, inType, outType);
+                       return result;
+               } else {
+                       return new SingleInputSemanticProperties();
                }
        }
 
-       private static void readConstantSet(SemanticProperties sp, String s, 
TypeInformation<?> inType, TypeInformation<?> outType, int input) {
-               if (s.equals("*")) {
-                       if (sp instanceof SingleInputSemanticProperties) {
-                               for (int i = 0; i < inType.getArity() && i < 
outType.getArity(); i++) {
-                                       ((SingleInputSemanticProperties) 
sp).addForwardedField(i, i);
-                               }
-                       } else if (sp instanceof DualInputSemanticProperties) {
-                               for (int i = 0; i < inType.getArity() && i < 
outType.getArity(); i++) {
-                                       if (input == 0) {
-                                               ((DualInputSemanticProperties) 
sp).addForwardedField1(i, i);
-                                       } else if (input == 1) {
-                                               ((DualInputSemanticProperties) 
sp).addForwardedField2(i, i);
-                                       }
-                               }
-                       }
-                       return;
+       public static DualInputSemanticProperties getSemanticPropsDual(
+                       Set<Annotation> set, TypeInformation<?> inType1, 
TypeInformation<?> inType2, TypeInformation<?> outType) {
+               if (set == null) {
+                       return new DualInputSemanticProperties();
                }
+               Iterator<Annotation> it = set.iterator();
 
-               Matcher matcher = PATTERN_ANNOTATION.matcher(s);
+               String[] forwardedFirst = null;
+               String[] forwardedSecond = null;
+               String[] nonForwardedFirst = null;
+               String[] nonForwardedSecond = null;
+               String[] readFirst = null;
+               String[] readSecond = null;
 
-               if (!matcher.matches()) {
-                       throw new InvalidProgramException("Unrecognized 
annotation string format.");
-               }
+               while (it.hasNext()) {
+                       Annotation ann = it.next();
 
-               Matcher forwardMatcher = PATTERN_FORWARD.matcher(s);
-               while (forwardMatcher.find()) {
-                       int sourceField = 
Integer.valueOf(forwardMatcher.group(2));
-                       if (!isValidField(inType, sourceField)) {
-                               throw new 
IndexOutOfBoundsException("Annotation: Field " + sourceField + " not available 
in the input tuple.");
+                       if (ann instanceof ForwardedFieldsFirst) {
+                               forwardedFirst = ((ForwardedFieldsFirst) 
ann).value();
+                       } else if (ann instanceof ForwardedFieldsSecond) {
+                               forwardedSecond = ((ForwardedFieldsSecond) 
ann).value();
+                       } else if (ann instanceof NonForwardedFieldsFirst) {
+                               nonForwardedFirst = ((NonForwardedFieldsFirst) 
ann).value();
+                       } else if (ann instanceof NonForwardedFieldsSecond) {
+                               nonForwardedSecond = 
((NonForwardedFieldsSecond) ann).value();
+                       } else if (ann instanceof ReadFieldsFirst) {
+                               readFirst = ((ReadFieldsFirst) ann).value();
+                       } else if (ann instanceof ReadFieldsSecond) {
+                               readSecond = ((ReadFieldsSecond) ann).value();
+                       } else if (ann instanceof ForwardedFields || ann 
instanceof NonForwardedFields || ann instanceof ReadFields) {
+                               throw new 
InvalidSemanticAnnotationException("Annotation " + ann.getClass() + " invalid 
for dual input function.");
                        }
+               }
 
-                       if (forwardMatcher.group(7) != null) {
-                               if (sp instanceof 
SingleInputSemanticProperties) {
-                                       for (int i = 0; i < outType.getArity(); 
i++) {
-                                               
((SingleInputSemanticProperties) sp).addForwardedField(sourceField, i);
-                                       }
-                               } else if (sp instanceof 
DualInputSemanticProperties) {
-                                       for (int i = 0; i < outType.getArity(); 
i++) {
-                                               if (input == 0) {
-                                                       
((DualInputSemanticProperties) sp).addForwardedField1(sourceField, i);
-                                               } else if (input == 1) {
-                                                       
((DualInputSemanticProperties) sp).addForwardedField2(sourceField, i);
-                                               }
-                                       }
-                               }
-                               continue;
-                       }
-                       String found = forwardMatcher.group(4);
-                       FieldSet fs = readFieldSetFromString(found, inType, 
outType);
+               if(forwardedFirst != null || nonForwardedFirst != null || 
readFirst != null ||
+                               forwardedSecond != null ||nonForwardedSecond != 
null || readSecond != null) {
+                       DualInputSemanticProperties result = new 
DualInputSemanticProperties();
+                       getSemanticPropsDualFromString(result, forwardedFirst, 
forwardedSecond,
+                                       nonForwardedFirst, nonForwardedSecond, 
readFirst, readSecond, inType1, inType2, outType);
+                       return result;
+               } else {
+                       return new DualInputSemanticProperties();
+               }
+       }
 
-                       if (sp instanceof SingleInputSemanticProperties) {
-                               ((SingleInputSemanticProperties) 
sp).addForwardedField(sourceField, fs);
-                       } else if (sp instanceof DualInputSemanticProperties) {
-                               if (input == 0) {
-                                       ((DualInputSemanticProperties) 
sp).addForwardedField1(sourceField, fs);
-                               } else if (input == 1) {
-                                       ((DualInputSemanticProperties) 
sp).addForwardedField2(sourceField, fs);
-                               }
-                       }
+       public static void 
getSemanticPropsSingleFromString(SingleInputSemanticProperties result,
+                       String[] forwarded, String[] nonForwarded, String[] 
readSet,
+                       TypeInformation<?> inType, TypeInformation<?> outType)
+       {
+
+               boolean hasForwardedAnnotation = false;
+               boolean hasNonForwardedAnnotation = false;
+               // check for forwarded annotations
+               if(forwarded != null && forwarded.length > 0) {
+                       hasForwardedAnnotation = true;
                }
-               s = forwardMatcher.replaceAll("");
-
-               Matcher listMatcher = PATTERN_LIST.matcher(s);
-
-               while (listMatcher.find()) {
-                       String list = listMatcher.group();
-                       FieldSet fs = readFieldSetFromString(list, inType, 
outType);
-                       for (int i : fs) {
-                               if (sp instanceof 
SingleInputSemanticProperties) {
-                                       ((SingleInputSemanticProperties) 
sp).addForwardedField(i, i);
-                               } else if (sp instanceof 
DualInputSemanticProperties) {
-                                       if (input == 0) {
-                                               ((DualInputSemanticProperties) 
sp).addForwardedField1(i, i);
-                                       } else if (input == 1) {
-                                               ((DualInputSemanticProperties) 
sp).addForwardedField2(i, i);
-                                       }
-                               }
-                       }
+               // check non-forwarded annotations
+               if(nonForwarded != null && nonForwarded.length > 0) {
+                       hasNonForwardedAnnotation = true;
                }
+
+               if(hasForwardedAnnotation && hasNonForwardedAnnotation) {
+                       throw new InvalidSemanticAnnotationException("Either 
ForwardedFields OR " +
+                                       "NonForwardedFields annotation 
permitted, NOT both.");
+               } else if(hasForwardedAnnotation) {
+                       parseForwardedFields(result, forwarded, inType, 
outType, 0);
+               } else if(hasNonForwardedAnnotation) {
+                       parseNonForwardedFields(result, nonForwarded, inType, 
outType, 0);
+               }
+               parseReadFields(result, readSet, inType, 0);
        }
 
-       private static void parseConstantFieldsFirst(String[] cff, 
DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> 
outType) {
-               if (cff == null) {
-                       return;
+       public static void 
getSemanticPropsDualFromString(DualInputSemanticProperties result,
+                       String[] forwardedFirst, String[] forwardedSecond,
+                       String[] nonForwardedFirst, String[] 
nonForwardedSecond, String[]
+                       readFieldsFirst, String[] readFieldsSecond,
+                       TypeInformation<?> inType1, TypeInformation<?> inType2, 
TypeInformation<?> outType)
+       {
+
+               boolean hasForwardedFirstAnnotation = false;
+               boolean hasForwardedSecondAnnotation = false;
+               boolean hasNonForwardedFirstAnnotation = false;
+               boolean hasNonForwardedSecondAnnotation = false;
+               // check for forwarded annotations
+               if(forwardedFirst != null && forwardedFirst.length > 0) {
+                       hasForwardedFirstAnnotation = true;
+               }
+               if(forwardedSecond != null && forwardedSecond.length > 0) {
+                       hasForwardedSecondAnnotation = true;
+               }
+               // check non-forwarded annotations
+               if(nonForwardedFirst != null && nonForwardedFirst.length > 0) {
+                       hasNonForwardedFirstAnnotation = true;
+               }
+               if(nonForwardedSecond != null && nonForwardedSecond.length > 0) 
{
+                       hasNonForwardedSecondAnnotation = true;
                }
 
-               for (String s : cff) {
-                       if (s != null) {
-                               readConstantSet(dm, s, inType, outType, 0);
-                       }
+               if(hasForwardedFirstAnnotation && 
hasNonForwardedFirstAnnotation) {
+                       throw new InvalidSemanticAnnotationException("Either 
ForwardedFieldsFirst OR " +
+                                       "NonForwardedFieldsFirst annotation 
permitted, NOT both.");
+               }
+               if(hasForwardedSecondAnnotation && 
hasNonForwardedSecondAnnotation) {
+                       throw new InvalidSemanticAnnotationException("Either 
ForwardedFieldsSecond OR " +
+                                       "NonForwardedFieldsSecond annotation 
permitted, NOT both.");
                }
-       }
 
-       private static void parseConstantFieldsSecond(String[] cfs, 
DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> 
outType) {
-               if (cfs == null) {
-                       return;
+               if(hasForwardedFirstAnnotation) {
+                       parseForwardedFields(result, forwardedFirst, inType1, 
outType, 0);
+               } else if(hasNonForwardedFirstAnnotation) {
+                       parseNonForwardedFields(result, nonForwardedFirst, 
inType1, outType, 0);
                }
 
-               for (String s : cfs) {
-                       if (s != null) {
-                               readConstantSet(dm, s, inType, outType, 1);
-                       }
+               if(hasForwardedSecondAnnotation) {
+                       parseForwardedFields(result, forwardedSecond, inType2, 
outType, 1);
+               } else if(hasNonForwardedSecondAnnotation) {
+                       parseNonForwardedFields(result, nonForwardedSecond, 
inType2, outType, 1);
                }
+
+               parseReadFields(result, readFieldsFirst, inType1, 0);
+               parseReadFields(result, readFieldsSecond, inType2, 1);
        }
 
-       private static void parseConstantFieldsFirstExcept(String[] cffe, 
DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> 
outType) {
-               if (cffe == null) {
+
+       private static void parseForwardedFields(SemanticProperties sp, 
String[] forwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, 
int input) {
+
+               if (forwardedStr == null) {
                        return;
                }
 
-               for (String str : cffe) {
-                       if (str == null) {
+               for (String s : forwardedStr) {
+                       if (s == null) {
                                continue;
                        }
 
-                       FieldSet fs = readFieldSetFromString(str, inType, 
outType);
+                       // remove white characters
+                       s = s.replaceAll("\\s", "");
+
+                       Matcher wcMatcher = PATTERN_WILDCARD.matcher(s);
+                       // simple wildcard
+                       if (wcMatcher.matches()) {
 
-                       for (int i = 0; i < outType.getArity(); i++) {
-                               if (!fs.contains(i)) {
-                                       dm.addForwardedField1(i, i);
+                               if (!inType.equals(outType)) {
+                                       throw new 
InvalidSemanticAnnotationException("Forwarded field annotation \"" + s +
+                                                       "\" with wildcard only 
allowed for identical input and output types.");
                                }
-                       }
-               }
-       }
 
-       private static void parseConstantFieldsSecondExcept(String[] cfse, 
DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> 
outType) {
-               if (cfse == null) {
-                       return;
-               }
+                               for (int i = 0; i < inType.getTotalFields(); 
i++) {
+                                       if (sp instanceof 
SingleInputSemanticProperties) {
+                                               
((SingleInputSemanticProperties) sp).addForwardedField(i, i);
+                                       } else if (sp instanceof 
DualInputSemanticProperties) {
+                                               ((DualInputSemanticProperties) 
sp).addForwardedField(input, i, i);
+                                       }
+                               }
+                               return;
+                       }
 
-               for (String str : cfse) {
-                       if (str == null) {
-                               continue;
+                       // check format of annotation string
+                       Matcher matcher = PATTERN_ANNOTATION.matcher(s);
+                       if (!matcher.matches()) {
+                               throw new 
InvalidSemanticAnnotationException("Invalid format of forwarded field 
annotation \"" + s + "\".");
                        }
 
-                       FieldSet fs = readFieldSetFromString(str, inType, 
outType);
+                       // add forward annotations "->"
+                       Matcher forwardMatcher = PATTERN_FORWARD.matcher(s);
+                       while (forwardMatcher.find()) {
+                               String sourceStr = forwardMatcher.group(2);
+                               String targetStr = forwardMatcher.group(6);
 
-                       for (int i = 0; i < outType.getArity(); i++) {
-                               if (!fs.contains(i)) {
-                                       dm.addForwardedField2(i, i);
+                               try {
+                                       // check type compatibility
+                                       if (!areFieldsCompatible(sourceStr, 
inType, targetStr, outType)) {
+                                               throw new 
InvalidSemanticAnnotationException("Referenced fields of forwarded field 
annotation \"" + s + "\" do not match.");
+                                       }
+                                       List<FlatFieldDescriptor> inFFDs = 
getFlatFields(sourceStr, inType);
+                                       List<FlatFieldDescriptor> outFFDs = 
getFlatFields(targetStr, outType);
+                                       if (sp instanceof 
SingleInputSemanticProperties) {
+                                               for (int i = 0; i < 
inFFDs.size(); i++) {
+                                                       int sourceField = 
inFFDs.get(i).getPosition();
+                                                       int targetField = 
outFFDs.get(i).getPosition();
+                                                       
((SingleInputSemanticProperties) sp).addForwardedField(sourceField, 
targetField);
+                                               }
+                                       } else if (sp instanceof 
DualInputSemanticProperties) {
+                                               for (int i = 0; i < 
inFFDs.size(); i++) {
+                                                       int sourceField = 
inFFDs.get(i).getPosition();
+                                                       int targetField = 
outFFDs.get(i).getPosition();
+                                                       
((DualInputSemanticProperties) sp).addForwardedField(input, sourceField, 
targetField);
+                                               }
+                                       }
+                               } catch (InvalidFieldReferenceException ifre) {
+                                       throw new 
InvalidSemanticAnnotationException("Invalid field reference in forwarded field 
annotation \"" + sourceStr + "->" + targetStr + "\".", ifre);
+                               } catch (InvalidSemanticAnnotationException 
isae) {
+                                       throw new 
InvalidSemanticAnnotationException("Forwarded field annotation \"" + sourceStr 
+ "->" + targetStr + "\" could not be added.", isae);
+                               }
+                       }
+                       // remove forward annotations
+                       s = forwardMatcher.replaceAll("");
+
+                       // add forwarded annotations
+                       Matcher listMatcher = PATTERN_LIST.matcher(s);
+                       while (listMatcher.find()) {
+                               String list = listMatcher.group();
+                               Matcher fieldMatcher = 
PATTERN_FIELD.matcher(list);
+
+                               // for each nested field
+                               while (fieldMatcher.find()) {
+                                       String fieldStr = fieldMatcher.group();
+                                       try {
+                                               // check if field is compatible 
in input and output type
+                                               if 
(!areFieldsCompatible(fieldStr, inType, fieldStr, outType)) {
+                                                       throw new 
InvalidSemanticAnnotationException("Referenced fields of forwarded field 
annotation \"" + s + "\" do not match.");
+                                               }
+                                               // add flat field positions
+                                               List<FlatFieldDescriptor> 
inFFDs = getFlatFields(fieldStr, inType);
+                                               List<FlatFieldDescriptor> 
outFFDs = getFlatFields(fieldStr, outType);
+                                               for (int i = 0; i < 
inFFDs.size(); i++) {
+                                                       int sourcePos = 
inFFDs.get(i).getPosition();
+                                                       int targetPos = 
outFFDs.get(i).getPosition();
+                                                       if (sp instanceof 
SingleInputSemanticProperties) {
+                                                               
((SingleInputSemanticProperties) sp).addForwardedField(sourcePos, targetPos);
+                                                       } else if (sp 
instanceof DualInputSemanticProperties) {
+                                                               
((DualInputSemanticProperties) sp).addForwardedField(input, sourcePos, 
targetPos);
+                                                       }
+                                               }
+                                       } catch (InvalidFieldReferenceException 
ifre) {
+                                               throw new 
InvalidSemanticAnnotationException("Invalid field reference in forwarded field 
annotation \"" + fieldStr + "\".", ifre);
+                                       } catch 
(InvalidSemanticAnnotationException isae) {
+                                               throw new 
InvalidSemanticAnnotationException("Forwarded field annotation \"" + fieldStr + 
"\" could not be added.", isae);
+                                       }
                                }
                        }
                }
        }
 
-       private static void parseReadFieldsFirst(String[] rf, 
DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> 
outType) {
-               if (rf == null) {
+       private static void parseNonForwardedFields(
+                       SemanticProperties sp, String[] nonForwardedStr, 
TypeInformation<?> inType, TypeInformation<?> outType, int input) {
+
+               if(nonForwardedStr == null) {
                        return;
                }
 
-               for (String str : rf) {
-                       if (str != null) {
-                               FieldSet fs = readFieldSetFromString(str, 
inType, outType);
-                               dm.addReadFields1(fs);
-                       }
-               }
-       }
+               FieldSet excludedFields = new FieldSet();
+               for(String s : nonForwardedStr) {
 
-       private static void parseReadFieldsSecond(String[] rf, 
DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> 
outType) {
-               if (rf == null) {
-                       return;
-               }
+                       // remove white characters
+                       s = s.replaceAll("\\s", "");
 
-               for (String str : rf) {
-                       if (str != null) {
-                               FieldSet fs = readFieldSetFromString(str, 
inType, outType);
-                               dm.addReadFields2(fs);
+                       if (s.equals("")) {
+                               continue;
                        }
-               }
-       }
-
 
-       private static boolean isValidField(TypeInformation<?> type, int field) 
{
-               return field >= 0 && field < type.getArity();
-       }
+                       if(!inType.equals(outType)) {
+                               throw new 
InvalidSemanticAnnotationException("Non-forwarded fields annotation only 
allowed for identical input and output types.");
+                       }
 
-       private static void parseConstantFieldsExcept(String[] cfe, 
SingleInputSemanticProperties sm, TypeInformation<?> inType, TypeInformation<?> 
outType) {
-               if (cfe == null) {
-                       return;
-               }
+                       Matcher matcher = PATTERN_LIST.matcher(s);
+                       if (!matcher.matches()) {
+                               throw new 
InvalidSemanticAnnotationException("Invalid format of non-forwarded fields 
annotation \""+s+"\".");
+                       }
 
-               for (String str : cfe) {
-                       if (str != null) {
-                               FieldSet fs = readFieldSetFromString(str, 
inType, outType);
+                       // process individual fields
+                       matcher = PATTERN_FIELD.matcher(s);
+                       while (matcher.find()) {
+                               String fieldStr = matcher.group();
 
-                               for (int i = 0; i < outType.getArity(); i++) {
-                                       if (!fs.contains(i)) {
-                                               sm.addForwardedField(i, i);
+                               try {
+                                       // get and add all flat field positions
+                                       List<FlatFieldDescriptor> inFFDs = 
getFlatFields(fieldStr, inType);
+                                       for (FlatFieldDescriptor ffd : inFFDs) {
+                                               excludedFields = 
excludedFields.addField(ffd.getPosition());
                                        }
+                               } catch(InvalidFieldReferenceException ifre) {
+                                       throw new 
InvalidSemanticAnnotationException("Invalid field reference in non-forwarded 
fields annotation \""+fieldStr+"\".",ifre);
                                }
                        }
                }
-       }
 
-       private static FieldSet readFieldSetFromString(String s, 
TypeInformation<?> inType, TypeInformation<?> outType) {
-               Matcher matcher = PATTERN_LIST.matcher(s);
-
-               if (!matcher.matches()) {
-                       throw new InvalidProgramException("Unrecognized 
annotation string format.");
-               }
-
-               matcher = PATTERN_DIGIT.matcher(s);
-               FieldSet fs = FieldSet.EMPTY_SET;
-
-               while (matcher.find()) {
-                       int field = Integer.valueOf(matcher.group());
-                       if (!isValidField(outType, field)) {
-                               throw new 
IndexOutOfBoundsException("Annotation: Field " + field + " not available in the 
output tuple.");
-                       }
-                       if (!isValidField(inType, field)) {
-                               throw new 
IndexOutOfBoundsException("Annotation: Field " + field + " not available in the 
input tuple.");
+               for(int i = 0; i < inType.getTotalFields(); i++) {
+                       if(!excludedFields.contains(i)) {
+                               if(sp instanceof SingleInputSemanticProperties) 
{
+                                       ((SingleInputSemanticProperties) 
sp).addForwardedField(i,i);
+                               } else if(sp instanceof 
DualInputSemanticProperties) {
+                                       ((DualInputSemanticProperties) 
sp).addForwardedField(input, i, i);
+                               }
                        }
-                       
-                       fs = fs.addField(field);
                }
-               return fs;
+
        }
 
-       private static void parseReadFields(String[] rf, 
SingleInputSemanticProperties sm, TypeInformation<?> inType, TypeInformation<?> 
outType) {
-               if (rf == null) {
+       private static void parseReadFields(SemanticProperties sp, String[] 
readFieldStrings, TypeInformation<?> inType, int input) {
+
+               if(readFieldStrings == null) {
                        return;
                }
 
-               for (String str : rf) {
-                       if (str != null) {
-                               FieldSet fs = readFieldSetFromString(str, 
inType, outType);
-                               sm.addReadFields(fs);
+               for(String s : readFieldStrings) {
+
+                       FieldSet readFields = new FieldSet();
+
+                       // remove white characters
+                       s = s.replaceAll("\\s", "");
+
+                       Matcher wcMatcher = PATTERN_WILDCARD.matcher(s);
+                       // simple wildcard
+                       if (wcMatcher.matches()) {
+                               // add all fields
+                               for (int i = 0; i < inType.getTotalFields(); 
i++) {
+                                       readFields = readFields.addField(i);
+                               }
+                       } else {
+                               // process field list
+                               Matcher matcher = PATTERN_LIST.matcher(s);
+
+                               if (!matcher.matches()) {
+                                       throw new 
InvalidSemanticAnnotationException("Invalid format of read field annotation \"" 
+ s + "\".");
+                               }
+
+                               // process field
+                               matcher = PATTERN_FIELD.matcher(s);
+                               while (matcher.find()) {
+                                       String fieldStr = matcher.group();
+                                       try {
+                                               List<FlatFieldDescriptor> ffds 
= getFlatFields(fieldStr, inType);
+                                               // get and add flat field 
positions
+                                               for (FlatFieldDescriptor ffd : 
ffds) {
+                                                       readFields = 
readFields.addField(ffd.getPosition());
+                                               }
+                                       } catch (InvalidFieldReferenceException 
ifre) {
+                                               throw new 
InvalidSemanticAnnotationException("Invalid field reference in read field 
annotation \"" + fieldStr + "\".", ifre);
+
+                                       }
+                               }
                        }
-               }
-       }
 
-       public static SingleInputSemanticProperties 
getSemanticPropsSingleFromString(String[] constantSet, String[] 
constantSetExcept, String[] readSet, TypeInformation<?> inType, 
TypeInformation<?> outType) {
-               SingleInputSemanticProperties result = new 
SingleInputSemanticProperties();
-               parseConstantFields(constantSet, result, inType, outType);
-               parseConstantFieldsExcept(constantSetExcept, result, inType, 
outType);
-               parseReadFields(readSet, result, inType, outType);
-               return result;
+                       if (sp instanceof SingleInputSemanticProperties) {
+                               ((SingleInputSemanticProperties) 
sp).addReadFields(readFields);
+                       } else if (sp instanceof DualInputSemanticProperties) {
+                               ((DualInputSemanticProperties) 
sp).addReadFields(input, readFields);
+                       }
+               }
        }
 
-       public static void 
getSemanticPropsDualFromString(DualInputSemanticProperties target, String[] 
constantSetFirst, String[] constantSetSecond, String[] constantSetFirstExcept,
-                                                       String[] 
constantSetSecondExcept, String[] readFieldsFirst, String[] readFieldsSecond, 
TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> 
outType)
-       {
-               parseConstantFieldsFirst(constantSetFirst, target, inType1, 
outType);
-               parseConstantFieldsSecond(constantSetSecond, target, inType2, 
outType);
-               parseConstantFieldsFirstExcept(constantSetFirstExcept, target, 
inType1, outType);
-               parseConstantFieldsSecondExcept(constantSetSecondExcept, 
target, inType2, outType);
-               parseReadFieldsFirst(readFieldsFirst, target, inType1, outType);
-               parseReadFieldsSecond(readFieldsSecond, target, inType2, 
outType);
-       }
+       ////////////////////// UTIL METHODS ///////////////////////////////
 
-       public static DualInputSemanticProperties 
getSemanticPropsDual(Set<Annotation> set, TypeInformation<?> inType1, 
TypeInformation<?> inType2, TypeInformation<?> outType) {
-               if (set == null) {
-                       return null;
-               }
+       private static boolean areFieldsCompatible(String sourceField, 
TypeInformation<?> inType, String targetField, TypeInformation<?> outType) {
 
-               Iterator<Annotation> it = set.iterator();
-               DualInputSemanticProperties result = null;
+               // get source type information
+               TypeInformation<?> sourceType = 
getExpressionTypeInformation(sourceField, inType);
+               // get target type information
+               TypeInformation<?> targetType = 
getExpressionTypeInformation(targetField, outType);
 
-               //non tuple types are not yet supported for annotations
-               if (!inType1.isTupleType() || !inType2.isTupleType() || 
!outType.isTupleType()) {
-                       return null;
-               }
+               return (sourceType.equals(targetType));
+       }
 
-               while (it.hasNext()) {
-                       if (result == null) {
-                               result = new DualInputSemanticProperties();
+       private static TypeInformation<?> getExpressionTypeInformation(String 
fieldStr, TypeInformation<?> typeInfo) {
+               Matcher wildcardMatcher = PATTERN_WILDCARD.matcher(fieldStr);
+               if(wildcardMatcher.matches()) {
+                       return typeInfo;
+               } else {
+                       Matcher expMatcher = PATTERN_FIELD.matcher(fieldStr);
+                       if(!expMatcher.matches()) {
+                               throw new 
InvalidFieldReferenceException("Invalid field expression \""+fieldStr+"\".");
                        }
+                       if(typeInfo instanceof CompositeType<?>) {
+                               return ((CompositeType) 
typeInfo).getTypeAt(expMatcher.group(1));
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Nested field expression \""+fieldStr+"\" not 
possible on atomic type ("+typeInfo+").");
+                       }
+               }
+       }
 
-                       Annotation ann = it.next();
-
-                       if (ann instanceof ConstantFieldsFirst) {
-                               ConstantFieldsFirst cff = (ConstantFieldsFirst) 
ann;
-                               parseConstantFieldsFirst(cff.value(), result, 
inType1, outType);
-                       } else if (ann instanceof ConstantFieldsSecond) {
-                               ConstantFieldsSecond cfs = 
(ConstantFieldsSecond) ann;
-                               parseConstantFieldsSecond(cfs.value(), result, 
inType2, outType);
-                       } else if (ann instanceof ConstantFieldsFirstExcept) {
-                               ConstantFieldsFirstExcept cffe = 
(ConstantFieldsFirstExcept) ann;
-                               parseConstantFieldsFirstExcept(cffe.value(), 
result, inType1, outType);
-                       } else if (ann instanceof ConstantFieldsSecondExcept) {
-                               ConstantFieldsSecondExcept cfse = 
(ConstantFieldsSecondExcept) ann;
-                               parseConstantFieldsSecondExcept(cfse.value(), 
result, inType2, outType);
-                       } else if (ann instanceof ReadFieldsFirst) {
-                               ReadFieldsFirst rff = (ReadFieldsFirst) ann;
-                               parseReadFieldsFirst(rff.value(), result, 
inType1, outType);
-                       } else if (ann instanceof ReadFieldsSecond) {
-                               ReadFieldsSecond rfs = (ReadFieldsSecond) ann;
-                               parseReadFieldsSecond(rfs.value(), result, 
inType2, outType);
+       private static List<FlatFieldDescriptor> getFlatFields(String fieldStr, 
TypeInformation<?> typeInfo) {
+               if(typeInfo instanceof CompositeType<?>) {
+                       return ((CompositeType) 
typeInfo).getFlatFields(fieldStr);
+               } else {
+                       Matcher wildcardMatcher = 
PATTERN_WILDCARD.matcher(fieldStr);
+                       if(wildcardMatcher.matches()) {
+                               return Collections.singletonList(new 
FlatFieldDescriptor(0, typeInfo));
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Nested field expression \""+fieldStr+"\" not 
possible on atomic type ("+typeInfo+").");
                        }
                }
-               return result;
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 2411021..9ea28f7 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -68,7 +68,7 @@ public class CrossOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OUT,
                this.defaultName = defaultName;
                this.hint = hint;
        }
-       
+
        @Override
        protected CrossFunction<I1, I2, OUT> getFunction() {
                return function;
@@ -322,19 +322,20 @@ public class CrossOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OUT,
                }
 
                @Override
-               public CrossOperator<I1, I2, OUT> 
withConstantSetFirst(String... constantSetFirst) {
-                       throw new InvalidProgramException("The semantic 
properties (constant fields and forwarded fields) are automatically 
calculated.");
+               public CrossOperator<I1, I2, OUT> 
withForwardedFieldsFirst(String... forwardedFieldsFirst) {
+                       throw new InvalidProgramException("The semantic 
properties (forwarded fields) are automatically calculated.");
                }
 
                @Override
-               public CrossOperator<I1, I2, OUT> 
withConstantSetSecond(String... constantSetSecond) {
-                       throw new InvalidProgramException("The semantic 
properties (constant fields and forwarded fields) are automatically 
calculated.");
+               public CrossOperator<I1, I2, OUT> 
withForwardedFieldsSecond(String... forwardedFieldsSecond) {
+                       throw new InvalidProgramException("The semantic 
properties (forwarded fields) are automatically calculated.");
                }
                
                @Override
                protected DualInputSemanticProperties 
extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
                        // we do not extract anything, but construct the 
properties from the projection
-                       return 
SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), 
getFunction().getIsFromFirst());
+                       return 
SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), 
getFunction().getIsFromFirst(),
+                                       getInput1Type(), getInput2Type());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index d30dc5e..d570fc2 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -96,7 +95,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                                SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
                                
                                for (int field : 
keys.computeLogicalKeyPositions()) {
-                                       sProps.setForwardedField(field, new 
FieldSet(field));
+                                       sProps.addForwardedField(field, field);
                                }
                                
                                po.setSemanticProperties(sProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 822759f..e60314f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -39,6 +39,8 @@ import 
org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import 
org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
@@ -230,7 +232,27 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                                return 
super.extractSemanticAnnotationsFromUdf(function.getClass());
                        }
                }
-               
+
+               @Override
+               protected boolean 
udfWithForwardedFieldsFirstAnnotation(Class<?> udfClass) {
+
+                       if (function instanceof 
DefaultJoin.WrappingFlatJoinFunction) {
+                               return 
super.udfWithForwardedFieldsFirstAnnotation(((WrappingFunction<?>) 
function).getWrappedFunction().getClass());
+                       } else {
+                               return 
super.udfWithForwardedFieldsFirstAnnotation(function.getClass());
+                       }
+               }
+
+               @Override
+               protected boolean 
udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) {
+
+                       if (function instanceof 
DefaultJoin.WrappingFlatJoinFunction) {
+                               return 
super.udfWithForwardedFieldsSecondAnnotation(((WrappingFunction<?>) 
function).getWrappedFunction().getClass());
+                       } else {
+                               return 
super.udfWithForwardedFieldsSecondAnnotation(function.getClass());
+                       }
+               }
+
                @Override
                protected JoinOperatorBase<?, ?, OUT, ?> 
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 
@@ -708,19 +730,20 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                }
 
                @Override
-               public JoinOperator<I1, I2, OUT> withConstantSetFirst(String... 
constantSetFirst) {
-                       throw new InvalidProgramException("The semantic 
properties (constant fields and forwarded fields) are automatically 
calculated.");
+               public JoinOperator<I1, I2, OUT> 
withForwardedFieldsFirst(String... forwardedFieldsFirst) {
+                       throw new InvalidProgramException("The semantic 
properties (forwarded fields) are automatically calculated.");
                }
 
                @Override
-               public JoinOperator<I1, I2, OUT> 
withConstantSetSecond(String... constantSetSecond) {
-                       throw new InvalidProgramException("The semantic 
properties (constant fields and forwarded fields) are automatically 
calculated.");
+               public JoinOperator<I1, I2, OUT> 
withForwardedFieldsSecond(String... forwardedFieldsSecond) {
+                       throw new InvalidProgramException("The semantic 
properties (forwarded fields) are automatically calculated.");
                }
                
                @Override
                protected DualInputSemanticProperties 
extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
                        // we do not extract the annotation, we construct the 
properties from the projection#
-                       return 
SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), 
getFunction().getIsFromFirst());
+                       return 
SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), 
getFunction().getIsFromFirst(),
+                                       getInput1Type(), getInput2Type());
                }
 
        }
@@ -968,7 +991,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
        // 
--------------------------------------------------------------------------------------------
        //  default join functions
        // 
--------------------------------------------------------------------------------------------
-       
+
+       @ForwardedFieldsFirst("*->0")
+       @ForwardedFieldsSecond("*->1")
        public static final class DefaultFlatJoinFunction<T1, T2> extends 
RichFlatJoinFunction<T1, T2, Tuple2<T1, T2>> {
 
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 5fca38a..30ff91a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -125,8 +125,7 @@ public abstract class Keys<T> {
                                if(keyType.isTupleType()) {
                                        // special case again:
                                        TupleTypeInfoBase<?> tupleKeyType = 
(TupleTypeInfoBase<?>) keyType;
-                                       List<FlatFieldDescriptor> keyTypeFields 
= new ArrayList<FlatFieldDescriptor>(tupleKeyType.getTotalFields());
-                                       
tupleKeyType.getKey(ExpressionKeys.SELECT_ALL_CHAR, 0, keyTypeFields);
+                                       List<FlatFieldDescriptor> keyTypeFields 
= tupleKeyType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR);
                                        if(expressionKeys.keyFields.size() != 
keyTypeFields.size()) {
                                                throw new 
IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE);
                                        }
@@ -207,7 +206,7 @@ public abstract class Keys<T> {
                // int-defined field
                public ExpressionKeys(int[] groupingFields, TypeInformation<T> 
type, boolean allowEmpty) {
                        if (!type.isTupleType()) {
-                               throw new InvalidProgramException("Specifying 
keys via field positions is only valid" +
+                               throw new InvalidProgramException("Specifying 
keys via field positions is only valid " +
                                                "for tuple data types. Type: " 
+ type);
                        }
 
@@ -288,8 +287,7 @@ public abstract class Keys<T> {
                        // extract the keys on their flat position
                        keyFields = new 
ArrayList<FlatFieldDescriptor>(expressions.length);
                        for (int i = 0; i < expressions.length; i++) {
-                               List<FlatFieldDescriptor> keys = new 
ArrayList<FlatFieldDescriptor>(); // use separate list to do a size check
-                               cType.getKey(expressions[i], 0, keys);
+                               List<FlatFieldDescriptor> keys = 
cType.getFlatFields(expressions[i]); // use separate list to do a size check
                                if(keys.size() == 0) {
                                        throw new 
IllegalArgumentException("Unable to extract key from expression 
'"+expressions[i]+"' on key "+cType);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 0b2aa95..bddef8f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.PlanProjectOperator;
@@ -72,7 +73,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
                ppo.setInput(input);
                // set dop
                ppo.setDegreeOfParallelism(this.getParallelism());
-               
ppo.setSemanticProperties(SemanticPropUtil.createProjectionPropertiesSingle(fields));
+               
ppo.setSemanticProperties(SemanticPropUtil.createProjectionPropertiesSingle(fields,
 (CompositeType<?>) getInputType()));
 
                return ppo;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index 0d0cb15..f55489f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -105,27 +106,74 @@ public abstract class SingleInputUdfOperator<IN, OUT, O 
extends SingleInputUdfOp
        }
 
        /**
-        * Adds a constant-set annotation for the UDF.
-        * 
         * <p>
-        * Constant set annotations are used by the optimizer to infer the 
existence of data properties (sorted, partitioned, grouped).
-        * In certain cases, these annotations allow the optimizer to generate 
a more efficient execution plan which can lead to improved performance.
-        * Constant set annotations can only be specified if the second input 
and the output type of the UDF are of
-        * {@link org.apache.flink.api.java.tuple.Tuple} data types.
-        * 
+        * Adds semantic information about forwarded fields of the user-defined 
function.
+        * The forwarded fields information declares fields which are never 
modified by the function and
+        * which are forwarded at the same position to the output or unchanged 
copied to another position in the output.
+        * </p>
+        *
         * <p>
-        * A constant-set annotation is a set of constant field specifications. 
The constant field specification String "4->3" specifies, that this UDF copies 
the fourth field of 
-        * an input tuple to the third field of the output tuple. Field 
references are zero-indexed.
-        * 
+        * Fields that are forwarded at the same position are specified by 
their position.
+        * The specified position must be valid for the input and output data 
type and have the same type.
+        * For example <code>withForwardedFields("f2")</code> declares that the 
third field of a Java input tuple is
+        * copied to the third field of an output tuple.
+        * </p>
+        *
         * <p>
-        * <b>NOTICE: Constant set annotations are optional, but if given need 
to be correct. Otherwise, the program might produce wrong results!</b>
-        * 
-        * @param constantSet A list of constant field specification Strings.
-        * @return This operator with an annotated constant field set.
+        * Fields which are unchanged copied to another position in the output 
are declared by specifying the
+        * source field reference in the input and the target field reference 
in the output.
+        * <code>withForwardedFields("f0->f2")</code> denotes that the first 
field of the Java input tuple is
+        * unchanged copied to the third field of the Java output tuple. When 
using a wildcard ("*") ensure that
+        * the number of declared fields and their types in input and output 
type match.
+        * </p>
+        *
+        * <p>
+        * Multiple forwarded fields can be annotated in one 
(<code>withForwardedFields("f2; f3->f0; f4")</code>)
+        * or separate Strings (<code>withForwardedFields("f2", "f3->f0", 
"f4")</code>).
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field references such as nested fields and wildcard.
+        * </p>
+        *
+        * <p>
+        * It is not possible to override existing semantic information about 
forwarded fields which was
+        * for example added by a {@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields} class 
annotation.
+        * </p>
+        *
+        * <p>
+        * <b>NOTE: Adding semantic information for functions is optional!
+        * If used correctly, semantic information can help the Flink optimizer 
to generate more efficient execution plans.
+        * However, incorrect semantic information can cause the optimizer to 
generate incorrect execution plans which compute wrong results!
+        * So be careful when adding semantic information.
+        * </b>
+        * </p>
+        *
+        * @param forwardedFields A list of field forward expressions.
+        * @return This operator with annotated forwarded field information.
+        *
+        * @see org.apache.flink.api.java.functions.FunctionAnnotation
+        * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
         */
-       public O withConstantSet(String... constantSet) {
-               SingleInputSemanticProperties props = 
SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, 
this.getInputType(), this.getResultType());
-               this.setSemanticProperties(props);
+       public O withForwardedFields(String... forwardedFields) {
+
+               if(this.udfSemantics == null) {
+                       // extract semantic properties from function annotations
+                       this.udfSemantics = 
extractSemanticAnnotations(getFunction().getClass());
+               }
+
+               if(this.udfSemantics == null) {
+                       this.udfSemantics = new SingleInputSemanticProperties();
+                       
SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, 
forwardedFields, null, null, this.getInputType(), this.getResultType());
+               } else {
+                       
if(udfWithForwardedFieldsAnnotation(getFunction().getClass())) {
+                               // refuse semantic information as it would 
override the function annotation
+                               throw new 
SemanticProperties.InvalidSemanticAnnotationException("Forwarded field 
information " +
+                                               "has already been added by a 
function annotation for this operator. " +
+                                               "Cannot overwrite function 
annotations.");
+                       } else {
+                               
SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, 
forwardedFields, null, null, this.getInputType(), this.getResultType());
+                       }
+               }
+
                @SuppressWarnings("unchecked")
                O returnType = (O) this;
                return returnType;
@@ -263,9 +311,9 @@ public abstract class SingleInputUdfOperator<IN, OUT, O 
extends SingleInputUdfOp
 
        @Override
        public SingleInputSemanticProperties getSemanticProperties() {
-               if (udfSemantics == null) {
+               if (this.udfSemantics == null) {
                        SingleInputSemanticProperties props = 
extractSemanticAnnotations(getFunction().getClass());
-                       udfSemantics = props != null ? props : new 
SingleInputSemanticProperties();
+                       this.udfSemantics = props != null ? props : new 
SingleInputSemanticProperties();
                }
                
                return this.udfSemantics;
@@ -284,7 +332,17 @@ public abstract class SingleInputUdfOperator<IN, OUT, O 
extends SingleInputUdfOp
        }
        
        protected SingleInputSemanticProperties 
extractSemanticAnnotations(Class<?> udfClass) {
-               Set<Annotation> annotations = 
FunctionAnnotation.readSingleConstantAnnotations(udfClass);
+               Set<Annotation> annotations = 
FunctionAnnotation.readSingleForwardAnnotations(udfClass);
                return SemanticPropUtil.getSemanticPropsSingle(annotations, 
getInputType(), getResultType());
        }
+
+       protected boolean udfWithForwardedFieldsAnnotation(Class<?> udfClass) {
+
+               if 
(udfClass.getAnnotation(FunctionAnnotation.ForwardedFields.class) != null ||
+                               
udfClass.getAnnotation(FunctionAnnotation.NonForwardedFields.class) != null) {
+                       return true;
+               } else {
+                       return false;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index 2de9282..91f9f7e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
@@ -107,64 +108,150 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, 
O extends TwoInputUdfOp
        }
 
        /**
-        * Adds a constant-set annotation for the first input of the UDF.
-        * 
         * <p>
-        * Constant set annotations are used by the optimizer to infer the 
existence of data properties (sorted, partitioned, grouped).
-        * In certain cases, these annotations allow the optimizer to generate 
a more efficient execution plan which can lead to improved performance.
-        * Constant set annotations can only be specified if the first input 
and the output type of the UDF are of
-        * {@link org.apache.flink.api.java.tuple.Tuple} data types.
-        * 
+        * Adds semantic information about forwarded fields of the first input 
of the user-defined function.
+        * The forwarded fields information declares fields which are never 
modified by the function and
+        * which are forwarded at the same position to the output or unchanged 
copied to another position in the output.
+        * </p>
+        *
         * <p>
-        * A constant-set annotation is a set of constant field specifications. 
The constant field specification String "4->3" specifies, that this UDF copies 
the fourth field of 
-        * an input tuple to the third field of the output tuple. Field 
references are zero-indexed.
-        * 
+        * Fields that are forwarded at the same position are specified by 
their position.
+        * The specified position must be valid for the input and output data 
type and have the same type.
+        * For example <code>withForwardedFieldsFirst("f2")</code> declares 
that the third field of a Java input tuple
+        * from the first input is copied to the third field of an output tuple.
+        * </p>
+        *
         * <p>
-        * <b>NOTICE: Constant set annotations are optional, but if given need 
to be correct. Otherwise, the program might produce wrong results!</b>
-        * 
-        * @param constantSetFirst A list of constant field specification 
Strings for the first input.
-        * @return This operator with an annotated constant field set for the 
first input.
+        * Fields which are unchanged copied from the first input to another 
position in the output are declared
+        * by specifying the source field reference in the first input and the 
target field reference in the output.
+        * <code>withForwardedFieldsFirst("f0->f2")</code> denotes that the 
first field of the first input Java tuple is
+        * unchanged copied to the third field of the Java output tuple. When 
using a wildcard ("*") ensure that
+        * the number of declared fields and their types in first input and 
output type match.
+        * </p>
+        *
+        * <p>
+        * Multiple forwarded fields can be annotated in one 
(<code>withForwardedFieldsFirst("f2; f3->f0; f4")</code>)
+        * or separate Strings (<code>withForwardedFieldsFirst("f2", "f3->f0", 
"f4")</code>).
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field references such as nested fields and wildcard.
+        * </p>
+        *
+        * <p>
+        * It is not possible to override existing semantic information about 
forwarded fields of the first input which was
+        * for example added by a {@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} 
class annotation.
+        * </p>
+        *
+        * <p>
+        * <b>NOTE: Adding semantic information for functions is optional!
+        * If used correctly, semantic information can help the Flink optimizer 
to generate more efficient execution plans.
+        * However, incorrect semantic information can cause the optimizer to 
generate incorrect execution plans which compute wrong results!
+        * So be careful when adding semantic information.
+        * </b>
+        * </p>
+        *
+        * @param forwardedFieldsFirst A list of forwarded field expressions 
for the first input of the function.
+        * @return This operator with annotated forwarded field information.
+        *
+        * @see org.apache.flink.api.java.functions.FunctionAnnotation
+        * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst
         */
        @SuppressWarnings("unchecked")
-       public O withConstantSetFirst(String... constantSetFirst) {
+       public O withForwardedFieldsFirst(String... forwardedFieldsFirst) {
                if (this.udfSemantics == null) {
+                       // extract semantic properties from function annotations
+                       this.udfSemantics = 
extractSemanticAnnotationsFromUdf(getFunction().getClass());
+               }
+
+               if(this.udfSemantics == null) {
                        this.udfSemantics = new DualInputSemanticProperties();
+                       
SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, 
forwardedFieldsFirst, null,
+                                       null, null, null, null, 
getInput1Type(), getInput2Type(), getResultType());
+               } else {
+                       
if(this.udfWithForwardedFieldsFirstAnnotation(getFunction().getClass())) {
+                               // refuse semantic information as it would 
override the function annotation
+                               throw new 
SemanticProperties.InvalidSemanticAnnotationException("Forwarded field 
information " +
+                                               "has already been added by a 
function annotation for the first input of this operator. " +
+                                               "Cannot overwrite function 
annotations.");
+                       } else {
+                               
SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, 
forwardedFieldsFirst, null,
+                                               null, null, null, null, 
getInput1Type(), getInput2Type(), getResultType());
+                       }
                }
-               
-               
SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, 
constantSetFirst, null,
-                               null, null, null, null, this.getInput1Type(), 
this.getInput2Type(), this.getResultType());
 
                O returnType = (O) this;
                return returnType;
        }
-       
+
        /**
-        * Adds a constant-set annotation for the second input of the UDF.
-        * 
         * <p>
-        * Constant set annotations are used by the optimizer to infer the 
existence of data properties (sorted, partitioned, grouped).
-        * In certain cases, these annotations allow the optimizer to generate 
a more efficient execution plan which can lead to improved performance.
-        * Constant set annotations can only be specified if the second input 
and the output type of the UDF are of
-        * {@link org.apache.flink.api.java.tuple.Tuple} data types.
-        * 
+        * Adds semantic information about forwarded fields of the second input 
of the user-defined function.
+        * The forwarded fields information declares fields which are never 
modified by the function and
+        * which are forwarded at the same position to the output or unchanged 
copied to another position in the output.
+        * </p>
+        *
         * <p>
-        * A constant-set annotation is a set of constant field specifications. 
The constant field specification String "4->3" specifies, that this UDF copies 
the fourth field of 
-        * an input tuple to the third field of the output tuple. Field 
references are zero-indexed.
-        * 
+        * Fields that are forwarded at the same position are specified by 
their position.
+        * The specified position must be valid for the input and output data 
type and have the same type.
+        * For example <code>withForwardedFieldsSecond("f2")</code> declares 
that the third field of a Java input tuple
+        * from the second input is copied to the third field of an output 
tuple.
+        * </p>
+        *
         * <p>
-        * <b>NOTICE: Constant set annotations are optional, but if given need 
to be correct. Otherwise, the program might produce wrong results!</b>
-        * 
-        * @param constantSetSecond A list of constant field specification 
Strings for the second input.
-        * @return This operator with an annotated constant field set for the 
second input.
+        * Fields which are unchanged copied from the second input to another 
position in the output are declared
+        * by specifying the source field reference in the second input and the 
target field reference in the output.
+        * <code>withForwardedFieldsSecond("f0->f2")</code> denotes that the 
first field of the second input Java tuple is
+        * unchanged copied to the third field of the Java output tuple. When 
using a wildcard ("*") ensure that
+        * the number of declared fields and their types in second input and 
output type match.
+        * </p>
+        *
+        * <p>
+        * Multiple forwarded fields can be annotated in one 
(<code>withForwardedFieldsSecond("f2; f3->f0; f4")</code>)
+        * or separate Strings (<code>withForwardedFieldsSecond("f2", "f3->f0", 
"f4")</code>).
+        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * details on field references such as nested fields and wildcard.
+        * </p>
+        *
+        * <p>
+        * It is not possible to override existing semantic information about 
forwarded fields of the second input which was
+        * for example added by a {@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} 
class annotation.
+        * </p>
+        *
+        * <p>
+        * <b>NOTE: Adding semantic information for functions is optional!
+        * If used correctly, semantic information can help the Flink optimizer 
to generate more efficient execution plans.
+        * However, incorrect semantic information can cause the optimizer to 
generate incorrect execution plans which compute wrong results!
+        * So be careful when adding semantic information.
+        * </b>
+        * </p>
+        *
+        * @param forwardedFieldsSecond A list of forwarded field expressions 
for the second input of the function.
+        * @return This operator with annotated forwarded field information.
+        *
+        * @see org.apache.flink.api.java.functions.FunctionAnnotation
+        * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond
         */
        @SuppressWarnings("unchecked")
-       public O withConstantSetSecond(String... constantSetSecond) {
+       public O withForwardedFieldsSecond(String... forwardedFieldsSecond) {
                if (this.udfSemantics == null) {
+                       // extract semantic properties from function annotations
+                       this.udfSemantics = 
extractSemanticAnnotationsFromUdf(getFunction().getClass());
+               }
+
+               if(this.udfSemantics == null) {
                        this.udfSemantics = new DualInputSemanticProperties();
+                       
SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, 
forwardedFieldsSecond,
+                                       null, null, null, null, 
getInput1Type(), getInput2Type(), getResultType());
+               } else {
+                       
if(udfWithForwardedFieldsSecondAnnotation(getFunction().getClass())) {
+                               // refuse semantic information as it would 
override the function annotation
+                               throw new 
SemanticProperties.InvalidSemanticAnnotationException("Forwarded field 
information " +
+                                               "has already been added by a 
function annotation for the second input of this operator. " +
+                                               "Cannot overwrite function 
annotations.");
+                       } else {
+                               
SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, 
forwardedFieldsSecond,
+                                               null, null, null, null, 
getInput1Type(), getInput2Type(), getResultType());
+                       }
                }
-               
-               
SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, 
constantSetSecond,
-                               null, null, null, null, this.getInput1Type(), 
this.getInput2Type(), this.getResultType());
 
                O returnType = (O) this;
                return returnType;
@@ -303,9 +390,9 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O 
extends TwoInputUdfOp
 
        @Override
        public DualInputSemanticProperties getSemanticProperties() {
-               if (udfSemantics == null) {
+               if (this.udfSemantics == null) {
                        DualInputSemanticProperties props = 
extractSemanticAnnotationsFromUdf(getFunction().getClass());
-                       udfSemantics = props != null ? props : new 
DualInputSemanticProperties();
+                       this.udfSemantics = props != null ? props : new 
DualInputSemanticProperties();
                }
                
                return this.udfSemantics;
@@ -325,7 +412,28 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O 
extends TwoInputUdfOp
        
        
        protected DualInputSemanticProperties 
extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
-               Set<Annotation> annotations = 
FunctionAnnotation.readDualConstantAnnotations(udfClass);
+               Set<Annotation> annotations = 
FunctionAnnotation.readDualForwardAnnotations(udfClass);
                return SemanticPropUtil.getSemanticPropsDual(annotations, 
getInput1Type(), getInput2Type(), getResultType());
        }
+
+       protected boolean udfWithForwardedFieldsFirstAnnotation(Class<?> 
udfClass) {
+
+               if 
(udfClass.getAnnotation(FunctionAnnotation.ForwardedFieldsFirst.class) != null 
||
+                               
udfClass.getAnnotation(FunctionAnnotation.NonForwardedFieldsFirst.class) != 
null) {
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       protected boolean udfWithForwardedFieldsSecondAnnotation(Class<?> 
udfClass) {
+
+               if 
(udfClass.getAnnotation(FunctionAnnotation.ForwardedFieldsSecond.class) != null 
||
+                               
udfClass.getAnnotation(FunctionAnnotation.NonForwardedFieldsSecond.class) != 
null) {
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
index 15b4719..1e99e2d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.api.java.operators.translation;
 
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 
-
+@ForwardedFields("*->1")
 public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, 
Tuple2<K, T>> {
        
        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
index e139697..e61c0de 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
@@ -19,9 +19,10 @@
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 
-
+@ForwardedFields("1->*")
 public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, 
T>, T> {
        
        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
index 45fb74f..77dbad6 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
@@ -23,9 +23,10 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.FilterOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.util.Collector;
 
-
+@ForwardedFields("*")
 public class PlanFilterOperator<T> extends FilterOperatorBase<T, 
FlatMapFunction<T, T>> {
        
        public PlanFilterOperator(FilterFunction<T> udf, String name, 
TypeInformation<T> type) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
index 1e73b92..d7a17b8 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
@@ -248,8 +248,7 @@ public class FunctionAnnotation {
        public @interface ConstantFieldsSecondExcept {
                int[] value();
        }
-       
-       
+
        /**
         * Private constructor to prevent instantiation. This class is intended 
only as a container.
         */
@@ -278,8 +277,7 @@ public class FunctionAnnotation {
                
                // extract notConstantSet from annotation
                if (allConstants != null) {
-                       FieldSet nonConstant = new FieldSet();
-                       return new 
ImplicitlyForwardingSingleInputSemanticProperties(nonConstant);
+                       return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
                }
                
                SingleInputSemanticProperties semanticProperties = new 
SingleInputSemanticProperties();
@@ -329,13 +327,13 @@ public class FunctionAnnotation {
                // extract readSets from annotations
                if (constantSet1Annotation != null) {
                        for(int value: constantSet1Annotation.value()) {
-                               semanticProperties.addForwardedField1(value, 
value);
+                               semanticProperties.addForwardedField(0, value, 
value);
                        }
                }
                
                if (constantSet2Annotation != null) {
                        for(int value: constantSet2Annotation.value()) {
-                               semanticProperties.addForwardedField2(value, 
value);
+                               semanticProperties.addForwardedField(1, value, 
value);
                        }
                }
                
@@ -344,73 +342,66 @@ public class FunctionAnnotation {
 
 
        private static final class 
ImplicitlyForwardingSingleInputSemanticProperties extends 
SingleInputSemanticProperties {
-               private static final long serialVersionUID = 1L;
+
+               private static final long serialVersionUID = 1l;
 
                private FieldSet nonForwardedFields;
 
                private 
ImplicitlyForwardingSingleInputSemanticProperties(FieldSet nonForwardedFields) {
                        this.nonForwardedFields = nonForwardedFields;
-                       addWrittenFields(nonForwardedFields);
                }
 
-
-               /**
-                * Returns the logical position where the given field is 
written to.
-                * In this variant of the semantic properties, all fields are 
assumed implicitly forwarded,
-                * unless stated otherwise. We return the same field position, 
unless the field is explicitly
-                * marked as modified.
-                */
                @Override
-               public FieldSet getForwardedField(int sourceField) {
-                       if (isAllFieldsConstant()) {
-                               return new FieldSet(sourceField);
+               public FieldSet getForwardingTargetFields(int input, int 
sourceField) {
+
+                       if (input != 0) {
+                               throw new IndexOutOfBoundsException();
                        }
 
-                       if (this.nonForwardedFields.contains(sourceField)) {
-                               return null;
+                       if (nonForwardedFields == null) {
+                               return super.getForwardingTargetFields(input, 
sourceField);
                        } else {
-                               return new FieldSet(sourceField);
+                               if 
(this.nonForwardedFields.contains(sourceField)) {
+                                       return FieldSet.EMPTY_SET;
+                               } else {
+                                       return new FieldSet(sourceField);
+                               }
                        }
                }
 
                @Override
-               public FieldSet getSourceField(int input, int field) {
+               public int getForwardingSourceField(int input, int targetField) 
{
+
                        if (input != 0) {
                                throw new IndexOutOfBoundsException();
                        }
 
-                       if (isAllFieldsConstant()) {
-                               return new FieldSet(field);
-                       }
-
-                       if (this.nonForwardedFields == null) {
-                               return super.getSourceField(input, field);
-                       }
-
-                       if (this.nonForwardedFields.contains(field)) {
-                               return null;
+                       if (nonForwardedFields == null) {
+                               return super.getForwardingSourceField(input, 
targetField);
                        } else {
-                               return new FieldSet(field);
+                               if 
(this.nonForwardedFields.contains(targetField)) {
+                                       return -1;
+                               } else {
+                                       return targetField;
+                               }
                        }
                }
 
                @Override
-               public void addForwardedField(int sourceField, int 
destinationField) {
-                       throw new UnsupportedOperationException("When defining 
fields as implicitly constant " +
-                                       "(such as through the 
ConstantFieldsExcept annotation), you cannot manually add forwarded fields.");
+               public FieldSet getReadFields(int input) {
+                       return null;
                }
 
                @Override
-               public void addForwardedField(int sourceField, FieldSet 
destinationFields) {
-                       throw new UnsupportedOperationException("When defining 
fields as implicitly constant " +
-                                       "(such as through the 
ConstantFieldsExcept annotation), you cannot manually add forwarded fields.");
+               public void addForwardedField(int sourceField, int 
destinationField) {
+                       if (this.nonForwardedFields == null) {
+                               super.addForwardedField(sourceField, 
destinationField);
+                       } else {
+                               throw new UnsupportedOperationException("When 
defining fields as implicitly constant for an input" +
+                                               "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
+                       }
                }
 
-               @Override
-               public void setForwardedField(int sourceField, FieldSet 
destinationFields) {
-                       throw new UnsupportedOperationException("When defining 
fields as implicitly constant " +
-                                       "(such as through the 
ConstantFieldsExcept annotation), you cannot manually add forwarded fields.");
-               }
        }
 
        private static final class 
ImplicitlyForwardingTwoInputSemanticProperties extends 
DualInputSemanticProperties {
@@ -421,7 +412,6 @@ public class FunctionAnnotation {
 
                private ImplicitlyForwardingTwoInputSemanticProperties() {}
 
-
                public void setImplicitlyForwardingFirstExcept(FieldSet 
nonForwardedFields) {
                        this.nonForwardedFields1 = nonForwardedFields;
                }
@@ -430,136 +420,63 @@ public class FunctionAnnotation {
                        this.nonForwardedFields2 = nonForwardedFields;
                }
 
-
                @Override
-               public FieldSet getForwardedField1(int sourceField) {
-                       if (isAllFieldsConstant()) {
-                               return new FieldSet(sourceField);
-                       }
+               public FieldSet getForwardingTargetFields(int input, int 
sourceField) {
 
-                       if (this.nonForwardedFields1 == null) {
-                               return super.getForwardedField1(sourceField);
-                       }
-                       else {
-                               if 
(this.nonForwardedFields1.contains(sourceField)) {
-                                       return null;
-                               } else {
-                                       return new FieldSet(sourceField);
-                               }
-                       }
-               }
+                       if(input != 0 && input != 1) {
+                               throw new IndexOutOfBoundsException();
+                       } else if (input == 0) {
 
-               @Override
-               public FieldSet getForwardedField2(int sourceField) {
-                       if (isAllFieldsConstant()) {
-                               return new FieldSet(sourceField);
-                       }
+                               if (this.nonForwardedFields1 == null) {
+                                       return 
super.getForwardingTargetFields(0, sourceField);
+                               }
+                               else {
+                                       if 
(this.nonForwardedFields1.contains(sourceField)) {
+                                               return FieldSet.EMPTY_SET;
+                                       } else {
+                                               return new 
FieldSet(sourceField);
+                                       }
+                               }
+                       } else {
 
-                       if (this.nonForwardedFields2 == null) {
-                               return super.getForwardedField2(sourceField);
-                       }
-                       else {
-                               if 
(this.nonForwardedFields2.contains(sourceField)) {
-                                       return null;
-                               } else {
-                                       return new FieldSet(sourceField);
+                               if (this.nonForwardedFields2 == null) {
+                                       return 
super.getForwardingTargetFields(1, sourceField);
+                               }
+                               else {
+                                       if 
(this.nonForwardedFields2.contains(sourceField)) {
+                                               return FieldSet.EMPTY_SET;
+                                       } else {
+                                               return new 
FieldSet(sourceField);
+                                       }
                                }
                        }
                }
 
                @Override
-               public FieldSet getSourceField(int input, int field) {
+               public void addForwardedField(int input, int sourceField, int 
destinationField) {
                        if (input != 0 && input != 1) {
                                throw new IndexOutOfBoundsException();
-                       }
-
-                       if (isAllFieldsConstant()) {
-                               return new FieldSet(field);
-                       }
-
-                       if (this.nonForwardedFields1 == null && 
this.nonForwardedFields2 == null) {
-                               return super.getSourceField(input, field);
-                       }
-
-                       if (input == 0 && this.nonForwardedFields1 != null && 
this.nonForwardedFields1.contains(field)) {
-                               return null;
                        } else if (input == 0) {
-                               return new FieldSet(field);
-                       }
-
-                       if (input == 1 && this.nonForwardedFields2 != null && 
this.nonForwardedFields2.contains(field)) {
-                               return null;
-                       } else if (input == 1) {
-                               return new FieldSet(field);
-                       }
-
-                       return null;
-               }
-
-               @Override
-               public void addForwardedField1(int sourceField, int 
destinationField) {
-                       if (this.nonForwardedFields1 == null) {
-                               super.addForwardedField1(sourceField, 
destinationField);
-                       }
-                       else {
-                               throw new UnsupportedOperationException("When 
defining fields as implicitly constant for an input" +
-                                               "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
-                       }
-               }
-
-               @Override
-               public void addForwardedField1(int sourceField, FieldSet 
destinationFields) {
-                       if (this.nonForwardedFields1 == null) {
-                               super.addForwardedField1(sourceField, 
destinationFields);
-                       }
-                       else {
-                               throw new UnsupportedOperationException("When 
defining fields as implicitly constant for an input" +
-                                               "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
-                       }
-               }
-
-               @Override
-               public void setForwardedField1(int sourceField, FieldSet 
destinationFields) {
-                       if (this.nonForwardedFields1 == null) {
-                               super.addForwardedField1(sourceField, 
destinationFields);
-                       }
-                       else {
-                               throw new UnsupportedOperationException("When 
defining fields as implicitly constant for an input" +
-                                               "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
-                       }
-               }
-
-               @Override
-               public void addForwardedField2(int sourceField, int 
destinationField) {
-                       if (this.nonForwardedFields2 == null) {
-                               super.addForwardedField2(sourceField, 
destinationField);
-                       }
-                       else {
-                               throw new UnsupportedOperationException("When 
defining fields as implicitly constant for an input" +
-                                               "(such as through the 
ConstantFieldsSecondExcept annotation), you cannot manually add forwarded 
fields.");
+                               if (this.nonForwardedFields1 == null) {
+                                       super.addForwardedField(0, sourceField, 
destinationField);
+                               } else {
+                                       throw new 
UnsupportedOperationException("When defining fields as implicitly constant for 
an input" +
+                                                       "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
+                               }
+                       } else {
+                               if (this.nonForwardedFields2 == null) {
+                                       super.addForwardedField(1, sourceField, 
destinationField);
+                               } else {
+                                       throw new 
UnsupportedOperationException("When defining fields as implicitly constant for 
an input" +
+                                                       "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
+                               }
                        }
                }
 
                @Override
-               public void addForwardedField2(int sourceField, FieldSet 
destinationFields) {
-                       if (this.nonForwardedFields2 == null) {
-                               super.addForwardedField2(sourceField, 
destinationFields);
-                       }
-                       else {
-                               throw new UnsupportedOperationException("When 
defining fields as implicitly constant for an input" +
-                                               "(such as through the 
ConstantFieldsSecondExcept annotation), you cannot manually add forwarded 
fields.");
-                       }
+               public FieldSet getReadFields(int input) {
+                       return null;
                }
 
-               @Override
-               public void setForwardedField2(int sourceField, FieldSet 
destinationFields) {
-                       if (this.nonForwardedFields2 == null) {
-                               super.addForwardedField2(sourceField, 
destinationFields);
-                       }
-                       else {
-                               throw new UnsupportedOperationException("When 
defining fields as implicitly constant for an input" +
-                                               "(such as through the 
ConstantFieldsSecondExcept annotation), you cannot manually add forwarded 
fields.");
-                       }
-               }
        }
 }

Reply via email to