http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java index 7c80bf8..362cf9b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java @@ -20,395 +20,558 @@ package org.apache.flink.api.java.functions; import java.lang.annotation.Annotation; +import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsExcept; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirstExcept; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecondExcept; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFieldsSecond; import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond; +import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; public class SemanticPropUtil { - private final static String REGEX_LIST = "(\\s*(\\d+\\s*,\\s*)*(\\d+\\s*))"; - private final static String REGEX_FORWARD = "(\\s*(\\d+)\\s*->(" + REGEX_LIST + "|(\\*)))"; - private final static String REGEX_LIST_OR_FORWARD = "(" + REGEX_LIST + "|" + REGEX_FORWARD + ")"; - private final static String REGEX_ANNOTATION = "(\\s*(" + REGEX_LIST_OR_FORWARD + "\\s*;\\s*)*(" + REGEX_LIST_OR_FORWARD + "\\s*))"; + private final static String REGEX_WILDCARD = "[\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]"; + private final static String REGEX_SINGLE_FIELD = "[\\p{L}\\p{Digit}_\\$]+"; + private final static String REGEX_NESTED_FIELDS = "((" + REGEX_SINGLE_FIELD + "\\.)*" + REGEX_SINGLE_FIELD + ")(\\."+ REGEX_WILDCARD +")?"; + private final static String REGEX_LIST = "((" + REGEX_NESTED_FIELDS + ";)*(" + REGEX_NESTED_FIELDS + ");?)"; + private final static String REGEX_FORWARD = "(("+ REGEX_NESTED_FIELDS +"|"+ REGEX_WILDCARD +")->(" + REGEX_NESTED_FIELDS + "|"+ REGEX_WILDCARD +"))"; + private final static String REGEX_FIELD_OR_FORWARD = "(" + REGEX_NESTED_FIELDS + "|" + REGEX_FORWARD + ")"; + private final static String REGEX_ANNOTATION = "((" + REGEX_FIELD_OR_FORWARD + ";)*(" + REGEX_FIELD_OR_FORWARD + ");?)"; + + private static final Pattern PATTERN_WILDCARD = Pattern.compile(REGEX_WILDCARD); private static final Pattern PATTERN_FORWARD = Pattern.compile(REGEX_FORWARD); private static final Pattern PATTERN_ANNOTATION = Pattern.compile(REGEX_ANNOTATION); private static final Pattern PATTERN_LIST = Pattern.compile(REGEX_LIST); + private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_NESTED_FIELDS); + + public static SingleInputSemanticProperties createProjectionPropertiesSingle(int[] fields, CompositeType<?> inType) + { - private static final Pattern PATTERN_DIGIT = Pattern.compile("\\d+"); + Character.isJavaIdentifierStart(1); - public static SingleInputSemanticProperties createProjectionPropertiesSingle(int[] fields) { SingleInputSemanticProperties ssp = new SingleInputSemanticProperties(); - for (int i = 0; i < fields.length; i++) { - ssp.addForwardedField(fields[i], i); + + int[] sourceOffsets = new int[inType.getArity()]; + sourceOffsets[0] = 0; + for(int i=1; i<inType.getArity(); i++) { + sourceOffsets[i] = inType.getTypeAt(i-1).getTotalFields() + sourceOffsets[i-1]; + } + + int targetOffset = 0; + for(int i=0; i<fields.length; i++) { + int sourceOffset = sourceOffsets[fields[i]]; + int numFieldsToCopy = inType.getTypeAt(fields[i]).getTotalFields(); + + for(int j=0; j<numFieldsToCopy; j++) { + ssp.addForwardedField(sourceOffset+j, targetOffset+j); + } + targetOffset += numFieldsToCopy; } + return ssp; } - public static DualInputSemanticProperties createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst) { + public static DualInputSemanticProperties createProjectionPropertiesDual( + int[] fields, boolean[] isFromFirst, TypeInformation<?> inType1, TypeInformation<?> inType2) + { DualInputSemanticProperties dsp = new DualInputSemanticProperties(); - for (int i = 0; i < fields.length; i++) { - if (isFromFirst[i]) { - dsp.addForwardedField1(fields[i], i); + int[] sourceOffsets1; + if(inType1 instanceof TupleTypeInfo<?>) { + sourceOffsets1 = new int[inType1.getArity()]; + sourceOffsets1[0] = 0; + for(int i=1; i<inType1.getArity(); i++) { + sourceOffsets1[i] = ((TupleTypeInfo<?>)inType1).getTypeAt(i-1).getTotalFields() + sourceOffsets1[i-1]; + } + } else { + sourceOffsets1 = new int[] {0}; + } + + int[] sourceOffsets2; + if(inType2 instanceof TupleTypeInfo<?>) { + sourceOffsets2 = new int[inType2.getArity()]; + sourceOffsets2[0] = 0; + for(int i=1; i<inType2.getArity(); i++) { + sourceOffsets2[i] = ((TupleTypeInfo<?>)inType2).getTypeAt(i-1).getTotalFields() + sourceOffsets2[i-1]; + } + } else { + sourceOffsets2 = new int[] {0}; + } + + int targetOffset = 0; + for(int i=0; i<fields.length; i++) { + int sourceOffset; + int numFieldsToCopy; + int input; + if(isFromFirst[i]) { + input = 0; + if (fields[i] == -1) { + sourceOffset = 0; + numFieldsToCopy = inType1.getTotalFields(); + } else { + sourceOffset = sourceOffsets1[fields[i]]; + numFieldsToCopy = ((TupleTypeInfo<?>)inType1).getTypeAt(fields[i]).getTotalFields(); + } } else { - dsp.addForwardedField2(fields[i], i); + input = 1; + if (fields[i] == -1) { + sourceOffset = 0; + numFieldsToCopy = inType2.getTotalFields(); + } else { + sourceOffset = sourceOffsets2[fields[i]]; + numFieldsToCopy = ((TupleTypeInfo<?>)inType2).getTypeAt(fields[i]).getTotalFields(); + } + } + + for(int j=0; j<numFieldsToCopy; j++) { + dsp.addForwardedField(input, sourceOffset+j, targetOffset+j); } + targetOffset += numFieldsToCopy; } + return dsp; } - public static SingleInputSemanticProperties getSemanticPropsSingle(Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) { + public static SingleInputSemanticProperties getSemanticPropsSingle( + Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) { if (set == null) { - return null; + return new SingleInputSemanticProperties(); } Iterator<Annotation> it = set.iterator(); - SingleInputSemanticProperties result = null; - //non tuple types are not yet supported for annotations - if (!inType.isTupleType() || !outType.isTupleType()) { - return null; - } + String[] forwarded = null; + String[] nonForwarded = null; + String[] read = null; while (it.hasNext()) { - if (result == null) { - result = new SingleInputSemanticProperties(); - } Annotation ann = it.next(); - if (ann instanceof ConstantFields) { - ConstantFields cf = (ConstantFields) ann; - parseConstantFields(cf.value(), result, inType, outType); - } else if (ann instanceof ConstantFieldsExcept) { - ConstantFieldsExcept cfe = (ConstantFieldsExcept) ann; - parseConstantFieldsExcept(cfe.value(), result, inType, outType); + if (ann instanceof ForwardedFields) { + forwarded = ((ForwardedFields) ann).value(); + } else if (ann instanceof NonForwardedFields) { + nonForwarded = ((NonForwardedFields) ann).value(); } else if (ann instanceof ReadFields) { - ReadFields rf = (ReadFields) ann; - parseReadFields(rf.value(), result, inType, outType); + read = ((ReadFields) ann).value(); + } else if (ann instanceof ForwardedFieldsFirst || ann instanceof ForwardedFieldsSecond || + ann instanceof NonForwardedFieldsFirst || ann instanceof NonForwardedFieldsSecond || + ann instanceof ReadFieldsFirst || ann instanceof ReadFieldsSecond) { + throw new InvalidSemanticAnnotationException("Annotation "+ann.getClass()+" invalid for single input function."); } } - return result; - } - private static void parseConstantFields(String[] cf, SingleInputSemanticProperties sm, TypeInformation<?> inType, TypeInformation<?> outType) { - if (cf == null) { - return; - } - for (String s : cf) { - if (s != null) { - readConstantSet(sm, s, inType, outType, 0); - } + if(forwarded != null || nonForwarded != null || read != null) { + SingleInputSemanticProperties result = new SingleInputSemanticProperties(); + getSemanticPropsSingleFromString(result, forwarded, nonForwarded, read, inType, outType); + return result; + } else { + return new SingleInputSemanticProperties(); } } - private static void readConstantSet(SemanticProperties sp, String s, TypeInformation<?> inType, TypeInformation<?> outType, int input) { - if (s.equals("*")) { - if (sp instanceof SingleInputSemanticProperties) { - for (int i = 0; i < inType.getArity() && i < outType.getArity(); i++) { - ((SingleInputSemanticProperties) sp).addForwardedField(i, i); - } - } else if (sp instanceof DualInputSemanticProperties) { - for (int i = 0; i < inType.getArity() && i < outType.getArity(); i++) { - if (input == 0) { - ((DualInputSemanticProperties) sp).addForwardedField1(i, i); - } else if (input == 1) { - ((DualInputSemanticProperties) sp).addForwardedField2(i, i); - } - } - } - return; + public static DualInputSemanticProperties getSemanticPropsDual( + Set<Annotation> set, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) { + if (set == null) { + return new DualInputSemanticProperties(); } + Iterator<Annotation> it = set.iterator(); - Matcher matcher = PATTERN_ANNOTATION.matcher(s); + String[] forwardedFirst = null; + String[] forwardedSecond = null; + String[] nonForwardedFirst = null; + String[] nonForwardedSecond = null; + String[] readFirst = null; + String[] readSecond = null; - if (!matcher.matches()) { - throw new InvalidProgramException("Unrecognized annotation string format."); - } + while (it.hasNext()) { + Annotation ann = it.next(); - Matcher forwardMatcher = PATTERN_FORWARD.matcher(s); - while (forwardMatcher.find()) { - int sourceField = Integer.valueOf(forwardMatcher.group(2)); - if (!isValidField(inType, sourceField)) { - throw new IndexOutOfBoundsException("Annotation: Field " + sourceField + " not available in the input tuple."); + if (ann instanceof ForwardedFieldsFirst) { + forwardedFirst = ((ForwardedFieldsFirst) ann).value(); + } else if (ann instanceof ForwardedFieldsSecond) { + forwardedSecond = ((ForwardedFieldsSecond) ann).value(); + } else if (ann instanceof NonForwardedFieldsFirst) { + nonForwardedFirst = ((NonForwardedFieldsFirst) ann).value(); + } else if (ann instanceof NonForwardedFieldsSecond) { + nonForwardedSecond = ((NonForwardedFieldsSecond) ann).value(); + } else if (ann instanceof ReadFieldsFirst) { + readFirst = ((ReadFieldsFirst) ann).value(); + } else if (ann instanceof ReadFieldsSecond) { + readSecond = ((ReadFieldsSecond) ann).value(); + } else if (ann instanceof ForwardedFields || ann instanceof NonForwardedFields || ann instanceof ReadFields) { + throw new InvalidSemanticAnnotationException("Annotation " + ann.getClass() + " invalid for dual input function."); } + } - if (forwardMatcher.group(7) != null) { - if (sp instanceof SingleInputSemanticProperties) { - for (int i = 0; i < outType.getArity(); i++) { - ((SingleInputSemanticProperties) sp).addForwardedField(sourceField, i); - } - } else if (sp instanceof DualInputSemanticProperties) { - for (int i = 0; i < outType.getArity(); i++) { - if (input == 0) { - ((DualInputSemanticProperties) sp).addForwardedField1(sourceField, i); - } else if (input == 1) { - ((DualInputSemanticProperties) sp).addForwardedField2(sourceField, i); - } - } - } - continue; - } - String found = forwardMatcher.group(4); - FieldSet fs = readFieldSetFromString(found, inType, outType); + if(forwardedFirst != null || nonForwardedFirst != null || readFirst != null || + forwardedSecond != null ||nonForwardedSecond != null || readSecond != null) { + DualInputSemanticProperties result = new DualInputSemanticProperties(); + getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond, + nonForwardedFirst, nonForwardedSecond, readFirst, readSecond, inType1, inType2, outType); + return result; + } else { + return new DualInputSemanticProperties(); + } + } - if (sp instanceof SingleInputSemanticProperties) { - ((SingleInputSemanticProperties) sp).addForwardedField(sourceField, fs); - } else if (sp instanceof DualInputSemanticProperties) { - if (input == 0) { - ((DualInputSemanticProperties) sp).addForwardedField1(sourceField, fs); - } else if (input == 1) { - ((DualInputSemanticProperties) sp).addForwardedField2(sourceField, fs); - } - } + public static void getSemanticPropsSingleFromString(SingleInputSemanticProperties result, + String[] forwarded, String[] nonForwarded, String[] readSet, + TypeInformation<?> inType, TypeInformation<?> outType) + { + + boolean hasForwardedAnnotation = false; + boolean hasNonForwardedAnnotation = false; + // check for forwarded annotations + if(forwarded != null && forwarded.length > 0) { + hasForwardedAnnotation = true; } - s = forwardMatcher.replaceAll(""); - - Matcher listMatcher = PATTERN_LIST.matcher(s); - - while (listMatcher.find()) { - String list = listMatcher.group(); - FieldSet fs = readFieldSetFromString(list, inType, outType); - for (int i : fs) { - if (sp instanceof SingleInputSemanticProperties) { - ((SingleInputSemanticProperties) sp).addForwardedField(i, i); - } else if (sp instanceof DualInputSemanticProperties) { - if (input == 0) { - ((DualInputSemanticProperties) sp).addForwardedField1(i, i); - } else if (input == 1) { - ((DualInputSemanticProperties) sp).addForwardedField2(i, i); - } - } - } + // check non-forwarded annotations + if(nonForwarded != null && nonForwarded.length > 0) { + hasNonForwardedAnnotation = true; } + + if(hasForwardedAnnotation && hasNonForwardedAnnotation) { + throw new InvalidSemanticAnnotationException("Either ForwardedFields OR " + + "NonForwardedFields annotation permitted, NOT both."); + } else if(hasForwardedAnnotation) { + parseForwardedFields(result, forwarded, inType, outType, 0); + } else if(hasNonForwardedAnnotation) { + parseNonForwardedFields(result, nonForwarded, inType, outType, 0); + } + parseReadFields(result, readSet, inType, 0); } - private static void parseConstantFieldsFirst(String[] cff, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) { - if (cff == null) { - return; + public static void getSemanticPropsDualFromString(DualInputSemanticProperties result, + String[] forwardedFirst, String[] forwardedSecond, + String[] nonForwardedFirst, String[] nonForwardedSecond, String[] + readFieldsFirst, String[] readFieldsSecond, + TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) + { + + boolean hasForwardedFirstAnnotation = false; + boolean hasForwardedSecondAnnotation = false; + boolean hasNonForwardedFirstAnnotation = false; + boolean hasNonForwardedSecondAnnotation = false; + // check for forwarded annotations + if(forwardedFirst != null && forwardedFirst.length > 0) { + hasForwardedFirstAnnotation = true; + } + if(forwardedSecond != null && forwardedSecond.length > 0) { + hasForwardedSecondAnnotation = true; + } + // check non-forwarded annotations + if(nonForwardedFirst != null && nonForwardedFirst.length > 0) { + hasNonForwardedFirstAnnotation = true; + } + if(nonForwardedSecond != null && nonForwardedSecond.length > 0) { + hasNonForwardedSecondAnnotation = true; } - for (String s : cff) { - if (s != null) { - readConstantSet(dm, s, inType, outType, 0); - } + if(hasForwardedFirstAnnotation && hasNonForwardedFirstAnnotation) { + throw new InvalidSemanticAnnotationException("Either ForwardedFieldsFirst OR " + + "NonForwardedFieldsFirst annotation permitted, NOT both."); + } + if(hasForwardedSecondAnnotation && hasNonForwardedSecondAnnotation) { + throw new InvalidSemanticAnnotationException("Either ForwardedFieldsSecond OR " + + "NonForwardedFieldsSecond annotation permitted, NOT both."); } - } - private static void parseConstantFieldsSecond(String[] cfs, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) { - if (cfs == null) { - return; + if(hasForwardedFirstAnnotation) { + parseForwardedFields(result, forwardedFirst, inType1, outType, 0); + } else if(hasNonForwardedFirstAnnotation) { + parseNonForwardedFields(result, nonForwardedFirst, inType1, outType, 0); } - for (String s : cfs) { - if (s != null) { - readConstantSet(dm, s, inType, outType, 1); - } + if(hasForwardedSecondAnnotation) { + parseForwardedFields(result, forwardedSecond, inType2, outType, 1); + } else if(hasNonForwardedSecondAnnotation) { + parseNonForwardedFields(result, nonForwardedSecond, inType2, outType, 1); } + + parseReadFields(result, readFieldsFirst, inType1, 0); + parseReadFields(result, readFieldsSecond, inType2, 1); } - private static void parseConstantFieldsFirstExcept(String[] cffe, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) { - if (cffe == null) { + + private static void parseForwardedFields(SemanticProperties sp, String[] forwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input) { + + if (forwardedStr == null) { return; } - for (String str : cffe) { - if (str == null) { + for (String s : forwardedStr) { + if (s == null) { continue; } - FieldSet fs = readFieldSetFromString(str, inType, outType); + // remove white characters + s = s.replaceAll("\\s", ""); + + Matcher wcMatcher = PATTERN_WILDCARD.matcher(s); + // simple wildcard + if (wcMatcher.matches()) { - for (int i = 0; i < outType.getArity(); i++) { - if (!fs.contains(i)) { - dm.addForwardedField1(i, i); + if (!inType.equals(outType)) { + throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + s + + "\" with wildcard only allowed for identical input and output types."); } - } - } - } - private static void parseConstantFieldsSecondExcept(String[] cfse, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) { - if (cfse == null) { - return; - } + for (int i = 0; i < inType.getTotalFields(); i++) { + if (sp instanceof SingleInputSemanticProperties) { + ((SingleInputSemanticProperties) sp).addForwardedField(i, i); + } else if (sp instanceof DualInputSemanticProperties) { + ((DualInputSemanticProperties) sp).addForwardedField(input, i, i); + } + } + return; + } - for (String str : cfse) { - if (str == null) { - continue; + // check format of annotation string + Matcher matcher = PATTERN_ANNOTATION.matcher(s); + if (!matcher.matches()) { + throw new InvalidSemanticAnnotationException("Invalid format of forwarded field annotation \"" + s + "\"."); } - FieldSet fs = readFieldSetFromString(str, inType, outType); + // add forward annotations "->" + Matcher forwardMatcher = PATTERN_FORWARD.matcher(s); + while (forwardMatcher.find()) { + String sourceStr = forwardMatcher.group(2); + String targetStr = forwardMatcher.group(6); - for (int i = 0; i < outType.getArity(); i++) { - if (!fs.contains(i)) { - dm.addForwardedField2(i, i); + try { + // check type compatibility + if (!areFieldsCompatible(sourceStr, inType, targetStr, outType)) { + throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match."); + } + List<FlatFieldDescriptor> inFFDs = getFlatFields(sourceStr, inType); + List<FlatFieldDescriptor> outFFDs = getFlatFields(targetStr, outType); + if (sp instanceof SingleInputSemanticProperties) { + for (int i = 0; i < inFFDs.size(); i++) { + int sourceField = inFFDs.get(i).getPosition(); + int targetField = outFFDs.get(i).getPosition(); + ((SingleInputSemanticProperties) sp).addForwardedField(sourceField, targetField); + } + } else if (sp instanceof DualInputSemanticProperties) { + for (int i = 0; i < inFFDs.size(); i++) { + int sourceField = inFFDs.get(i).getPosition(); + int targetField = outFFDs.get(i).getPosition(); + ((DualInputSemanticProperties) sp).addForwardedField(input, sourceField, targetField); + } + } + } catch (InvalidFieldReferenceException ifre) { + throw new InvalidSemanticAnnotationException("Invalid field reference in forwarded field annotation \"" + sourceStr + "->" + targetStr + "\".", ifre); + } catch (InvalidSemanticAnnotationException isae) { + throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + sourceStr + "->" + targetStr + "\" could not be added.", isae); + } + } + // remove forward annotations + s = forwardMatcher.replaceAll(""); + + // add forwarded annotations + Matcher listMatcher = PATTERN_LIST.matcher(s); + while (listMatcher.find()) { + String list = listMatcher.group(); + Matcher fieldMatcher = PATTERN_FIELD.matcher(list); + + // for each nested field + while (fieldMatcher.find()) { + String fieldStr = fieldMatcher.group(); + try { + // check if field is compatible in input and output type + if (!areFieldsCompatible(fieldStr, inType, fieldStr, outType)) { + throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match."); + } + // add flat field positions + List<FlatFieldDescriptor> inFFDs = getFlatFields(fieldStr, inType); + List<FlatFieldDescriptor> outFFDs = getFlatFields(fieldStr, outType); + for (int i = 0; i < inFFDs.size(); i++) { + int sourcePos = inFFDs.get(i).getPosition(); + int targetPos = outFFDs.get(i).getPosition(); + if (sp instanceof SingleInputSemanticProperties) { + ((SingleInputSemanticProperties) sp).addForwardedField(sourcePos, targetPos); + } else if (sp instanceof DualInputSemanticProperties) { + ((DualInputSemanticProperties) sp).addForwardedField(input, sourcePos, targetPos); + } + } + } catch (InvalidFieldReferenceException ifre) { + throw new InvalidSemanticAnnotationException("Invalid field reference in forwarded field annotation \"" + fieldStr + "\".", ifre); + } catch (InvalidSemanticAnnotationException isae) { + throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + fieldStr + "\" could not be added.", isae); + } } } } } - private static void parseReadFieldsFirst(String[] rf, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) { - if (rf == null) { + private static void parseNonForwardedFields( + SemanticProperties sp, String[] nonForwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input) { + + if(nonForwardedStr == null) { return; } - for (String str : rf) { - if (str != null) { - FieldSet fs = readFieldSetFromString(str, inType, outType); - dm.addReadFields1(fs); - } - } - } + FieldSet excludedFields = new FieldSet(); + for(String s : nonForwardedStr) { - private static void parseReadFieldsSecond(String[] rf, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) { - if (rf == null) { - return; - } + // remove white characters + s = s.replaceAll("\\s", ""); - for (String str : rf) { - if (str != null) { - FieldSet fs = readFieldSetFromString(str, inType, outType); - dm.addReadFields2(fs); + if (s.equals("")) { + continue; } - } - } - - private static boolean isValidField(TypeInformation<?> type, int field) { - return field >= 0 && field < type.getArity(); - } + if(!inType.equals(outType)) { + throw new InvalidSemanticAnnotationException("Non-forwarded fields annotation only allowed for identical input and output types."); + } - private static void parseConstantFieldsExcept(String[] cfe, SingleInputSemanticProperties sm, TypeInformation<?> inType, TypeInformation<?> outType) { - if (cfe == null) { - return; - } + Matcher matcher = PATTERN_LIST.matcher(s); + if (!matcher.matches()) { + throw new InvalidSemanticAnnotationException("Invalid format of non-forwarded fields annotation \""+s+"\"."); + } - for (String str : cfe) { - if (str != null) { - FieldSet fs = readFieldSetFromString(str, inType, outType); + // process individual fields + matcher = PATTERN_FIELD.matcher(s); + while (matcher.find()) { + String fieldStr = matcher.group(); - for (int i = 0; i < outType.getArity(); i++) { - if (!fs.contains(i)) { - sm.addForwardedField(i, i); + try { + // get and add all flat field positions + List<FlatFieldDescriptor> inFFDs = getFlatFields(fieldStr, inType); + for (FlatFieldDescriptor ffd : inFFDs) { + excludedFields = excludedFields.addField(ffd.getPosition()); } + } catch(InvalidFieldReferenceException ifre) { + throw new InvalidSemanticAnnotationException("Invalid field reference in non-forwarded fields annotation \""+fieldStr+"\".",ifre); } } } - } - private static FieldSet readFieldSetFromString(String s, TypeInformation<?> inType, TypeInformation<?> outType) { - Matcher matcher = PATTERN_LIST.matcher(s); - - if (!matcher.matches()) { - throw new InvalidProgramException("Unrecognized annotation string format."); - } - - matcher = PATTERN_DIGIT.matcher(s); - FieldSet fs = FieldSet.EMPTY_SET; - - while (matcher.find()) { - int field = Integer.valueOf(matcher.group()); - if (!isValidField(outType, field)) { - throw new IndexOutOfBoundsException("Annotation: Field " + field + " not available in the output tuple."); - } - if (!isValidField(inType, field)) { - throw new IndexOutOfBoundsException("Annotation: Field " + field + " not available in the input tuple."); + for(int i = 0; i < inType.getTotalFields(); i++) { + if(!excludedFields.contains(i)) { + if(sp instanceof SingleInputSemanticProperties) { + ((SingleInputSemanticProperties) sp).addForwardedField(i,i); + } else if(sp instanceof DualInputSemanticProperties) { + ((DualInputSemanticProperties) sp).addForwardedField(input, i, i); + } } - - fs = fs.addField(field); } - return fs; + } - private static void parseReadFields(String[] rf, SingleInputSemanticProperties sm, TypeInformation<?> inType, TypeInformation<?> outType) { - if (rf == null) { + private static void parseReadFields(SemanticProperties sp, String[] readFieldStrings, TypeInformation<?> inType, int input) { + + if(readFieldStrings == null) { return; } - for (String str : rf) { - if (str != null) { - FieldSet fs = readFieldSetFromString(str, inType, outType); - sm.addReadFields(fs); + for(String s : readFieldStrings) { + + FieldSet readFields = new FieldSet(); + + // remove white characters + s = s.replaceAll("\\s", ""); + + Matcher wcMatcher = PATTERN_WILDCARD.matcher(s); + // simple wildcard + if (wcMatcher.matches()) { + // add all fields + for (int i = 0; i < inType.getTotalFields(); i++) { + readFields = readFields.addField(i); + } + } else { + // process field list + Matcher matcher = PATTERN_LIST.matcher(s); + + if (!matcher.matches()) { + throw new InvalidSemanticAnnotationException("Invalid format of read field annotation \"" + s + "\"."); + } + + // process field + matcher = PATTERN_FIELD.matcher(s); + while (matcher.find()) { + String fieldStr = matcher.group(); + try { + List<FlatFieldDescriptor> ffds = getFlatFields(fieldStr, inType); + // get and add flat field positions + for (FlatFieldDescriptor ffd : ffds) { + readFields = readFields.addField(ffd.getPosition()); + } + } catch (InvalidFieldReferenceException ifre) { + throw new InvalidSemanticAnnotationException("Invalid field reference in read field annotation \"" + fieldStr + "\".", ifre); + + } + } } - } - } - public static SingleInputSemanticProperties getSemanticPropsSingleFromString(String[] constantSet, String[] constantSetExcept, String[] readSet, TypeInformation<?> inType, TypeInformation<?> outType) { - SingleInputSemanticProperties result = new SingleInputSemanticProperties(); - parseConstantFields(constantSet, result, inType, outType); - parseConstantFieldsExcept(constantSetExcept, result, inType, outType); - parseReadFields(readSet, result, inType, outType); - return result; + if (sp instanceof SingleInputSemanticProperties) { + ((SingleInputSemanticProperties) sp).addReadFields(readFields); + } else if (sp instanceof DualInputSemanticProperties) { + ((DualInputSemanticProperties) sp).addReadFields(input, readFields); + } + } } - public static void getSemanticPropsDualFromString(DualInputSemanticProperties target, String[] constantSetFirst, String[] constantSetSecond, String[] constantSetFirstExcept, - String[] constantSetSecondExcept, String[] readFieldsFirst, String[] readFieldsSecond, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) - { - parseConstantFieldsFirst(constantSetFirst, target, inType1, outType); - parseConstantFieldsSecond(constantSetSecond, target, inType2, outType); - parseConstantFieldsFirstExcept(constantSetFirstExcept, target, inType1, outType); - parseConstantFieldsSecondExcept(constantSetSecondExcept, target, inType2, outType); - parseReadFieldsFirst(readFieldsFirst, target, inType1, outType); - parseReadFieldsSecond(readFieldsSecond, target, inType2, outType); - } + ////////////////////// UTIL METHODS /////////////////////////////// - public static DualInputSemanticProperties getSemanticPropsDual(Set<Annotation> set, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) { - if (set == null) { - return null; - } + private static boolean areFieldsCompatible(String sourceField, TypeInformation<?> inType, String targetField, TypeInformation<?> outType) { - Iterator<Annotation> it = set.iterator(); - DualInputSemanticProperties result = null; + // get source type information + TypeInformation<?> sourceType = getExpressionTypeInformation(sourceField, inType); + // get target type information + TypeInformation<?> targetType = getExpressionTypeInformation(targetField, outType); - //non tuple types are not yet supported for annotations - if (!inType1.isTupleType() || !inType2.isTupleType() || !outType.isTupleType()) { - return null; - } + return (sourceType.equals(targetType)); + } - while (it.hasNext()) { - if (result == null) { - result = new DualInputSemanticProperties(); + private static TypeInformation<?> getExpressionTypeInformation(String fieldStr, TypeInformation<?> typeInfo) { + Matcher wildcardMatcher = PATTERN_WILDCARD.matcher(fieldStr); + if(wildcardMatcher.matches()) { + return typeInfo; + } else { + Matcher expMatcher = PATTERN_FIELD.matcher(fieldStr); + if(!expMatcher.matches()) { + throw new InvalidFieldReferenceException("Invalid field expression \""+fieldStr+"\"."); } + if(typeInfo instanceof CompositeType<?>) { + return ((CompositeType) typeInfo).getTypeAt(expMatcher.group(1)); + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+fieldStr+"\" not possible on atomic type ("+typeInfo+")."); + } + } + } - Annotation ann = it.next(); - - if (ann instanceof ConstantFieldsFirst) { - ConstantFieldsFirst cff = (ConstantFieldsFirst) ann; - parseConstantFieldsFirst(cff.value(), result, inType1, outType); - } else if (ann instanceof ConstantFieldsSecond) { - ConstantFieldsSecond cfs = (ConstantFieldsSecond) ann; - parseConstantFieldsSecond(cfs.value(), result, inType2, outType); - } else if (ann instanceof ConstantFieldsFirstExcept) { - ConstantFieldsFirstExcept cffe = (ConstantFieldsFirstExcept) ann; - parseConstantFieldsFirstExcept(cffe.value(), result, inType1, outType); - } else if (ann instanceof ConstantFieldsSecondExcept) { - ConstantFieldsSecondExcept cfse = (ConstantFieldsSecondExcept) ann; - parseConstantFieldsSecondExcept(cfse.value(), result, inType2, outType); - } else if (ann instanceof ReadFieldsFirst) { - ReadFieldsFirst rff = (ReadFieldsFirst) ann; - parseReadFieldsFirst(rff.value(), result, inType1, outType); - } else if (ann instanceof ReadFieldsSecond) { - ReadFieldsSecond rfs = (ReadFieldsSecond) ann; - parseReadFieldsSecond(rfs.value(), result, inType2, outType); + private static List<FlatFieldDescriptor> getFlatFields(String fieldStr, TypeInformation<?> typeInfo) { + if(typeInfo instanceof CompositeType<?>) { + return ((CompositeType) typeInfo).getFlatFields(fieldStr); + } else { + Matcher wildcardMatcher = PATTERN_WILDCARD.matcher(fieldStr); + if(wildcardMatcher.matches()) { + return Collections.singletonList(new FlatFieldDescriptor(0, typeInfo)); + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+fieldStr+"\" not possible on atomic type ("+typeInfo+")."); } } - return result; } + }
http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java index 2411021..9ea28f7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java @@ -68,7 +68,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, this.defaultName = defaultName; this.hint = hint; } - + @Override protected CrossFunction<I1, I2, OUT> getFunction() { return function; @@ -322,19 +322,20 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, } @Override - public CrossOperator<I1, I2, OUT> withConstantSetFirst(String... constantSetFirst) { - throw new InvalidProgramException("The semantic properties (constant fields and forwarded fields) are automatically calculated."); + public CrossOperator<I1, I2, OUT> withForwardedFieldsFirst(String... forwardedFieldsFirst) { + throw new InvalidProgramException("The semantic properties (forwarded fields) are automatically calculated."); } @Override - public CrossOperator<I1, I2, OUT> withConstantSetSecond(String... constantSetSecond) { - throw new InvalidProgramException("The semantic properties (constant fields and forwarded fields) are automatically calculated."); + public CrossOperator<I1, I2, OUT> withForwardedFieldsSecond(String... forwardedFieldsSecond) { + throw new InvalidProgramException("The semantic properties (forwarded fields) are automatically calculated."); } @Override protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) { // we do not extract anything, but construct the properties from the projection - return SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), getFunction().getIsFromFirst()); + return SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), getFunction().getIsFromFirst(), + getInput1Type(), getInput2Type()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index d30dc5e..d570fc2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.functions.RichGroupReduceFunction; @@ -96,7 +95,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); for (int field : keys.computeLogicalKeyPositions()) { - sProps.setForwardedField(field, new FieldSet(field)); + sProps.addForwardedField(field, field); } po.setSemanticProperties(sProps); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index 822759f..e60314f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -39,6 +39,8 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder; @@ -230,7 +232,27 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, return super.extractSemanticAnnotationsFromUdf(function.getClass()); } } - + + @Override + protected boolean udfWithForwardedFieldsFirstAnnotation(Class<?> udfClass) { + + if (function instanceof DefaultJoin.WrappingFlatJoinFunction) { + return super.udfWithForwardedFieldsFirstAnnotation(((WrappingFunction<?>) function).getWrappedFunction().getClass()); + } else { + return super.udfWithForwardedFieldsFirstAnnotation(function.getClass()); + } + } + + @Override + protected boolean udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) { + + if (function instanceof DefaultJoin.WrappingFlatJoinFunction) { + return super.udfWithForwardedFieldsSecondAnnotation(((WrappingFunction<?>) function).getWrappedFunction().getClass()); + } else { + return super.udfWithForwardedFieldsSecondAnnotation(function.getClass()); + } + } + @Override protected JoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { @@ -708,19 +730,20 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, } @Override - public JoinOperator<I1, I2, OUT> withConstantSetFirst(String... constantSetFirst) { - throw new InvalidProgramException("The semantic properties (constant fields and forwarded fields) are automatically calculated."); + public JoinOperator<I1, I2, OUT> withForwardedFieldsFirst(String... forwardedFieldsFirst) { + throw new InvalidProgramException("The semantic properties (forwarded fields) are automatically calculated."); } @Override - public JoinOperator<I1, I2, OUT> withConstantSetSecond(String... constantSetSecond) { - throw new InvalidProgramException("The semantic properties (constant fields and forwarded fields) are automatically calculated."); + public JoinOperator<I1, I2, OUT> withForwardedFieldsSecond(String... forwardedFieldsSecond) { + throw new InvalidProgramException("The semantic properties (forwarded fields) are automatically calculated."); } @Override protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) { // we do not extract the annotation, we construct the properties from the projection# - return SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), getFunction().getIsFromFirst()); + return SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), getFunction().getIsFromFirst(), + getInput1Type(), getInput2Type()); } } @@ -968,7 +991,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, // -------------------------------------------------------------------------------------------- // default join functions // -------------------------------------------------------------------------------------------- - + + @ForwardedFieldsFirst("*->0") + @ForwardedFieldsSecond("*->1") public static final class DefaultFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, Tuple2<T1, T2>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java index 5fca38a..30ff91a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java @@ -125,8 +125,7 @@ public abstract class Keys<T> { if(keyType.isTupleType()) { // special case again: TupleTypeInfoBase<?> tupleKeyType = (TupleTypeInfoBase<?>) keyType; - List<FlatFieldDescriptor> keyTypeFields = new ArrayList<FlatFieldDescriptor>(tupleKeyType.getTotalFields()); - tupleKeyType.getKey(ExpressionKeys.SELECT_ALL_CHAR, 0, keyTypeFields); + List<FlatFieldDescriptor> keyTypeFields = tupleKeyType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR); if(expressionKeys.keyFields.size() != keyTypeFields.size()) { throw new IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE); } @@ -207,7 +206,7 @@ public abstract class Keys<T> { // int-defined field public ExpressionKeys(int[] groupingFields, TypeInformation<T> type, boolean allowEmpty) { if (!type.isTupleType()) { - throw new InvalidProgramException("Specifying keys via field positions is only valid" + + throw new InvalidProgramException("Specifying keys via field positions is only valid " + "for tuple data types. Type: " + type); } @@ -288,8 +287,7 @@ public abstract class Keys<T> { // extract the keys on their flat position keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length); for (int i = 0; i < expressions.length; i++) { - List<FlatFieldDescriptor> keys = new ArrayList<FlatFieldDescriptor>(); // use separate list to do a size check - cType.getKey(expressions[i], 0, keys); + List<FlatFieldDescriptor> keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check if(keys.size() == 0) { throw new IllegalArgumentException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType); } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java index 0b2aa95..bddef8f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.translation.PlanProjectOperator; @@ -72,7 +73,7 @@ public class ProjectOperator<IN, OUT extends Tuple> ppo.setInput(input); // set dop ppo.setDegreeOfParallelism(this.getParallelism()); - ppo.setSemanticProperties(SemanticPropUtil.createProjectionPropertiesSingle(fields)); + ppo.setSemanticProperties(SemanticPropUtil.createProjectionPropertiesSingle(fields, (CompositeType<?>) getInputType())); return ppo; } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java index 0d0cb15..f55489f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; @@ -105,27 +106,74 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp } /** - * Adds a constant-set annotation for the UDF. - * * <p> - * Constant set annotations are used by the optimizer to infer the existence of data properties (sorted, partitioned, grouped). - * In certain cases, these annotations allow the optimizer to generate a more efficient execution plan which can lead to improved performance. - * Constant set annotations can only be specified if the second input and the output type of the UDF are of - * {@link org.apache.flink.api.java.tuple.Tuple} data types. - * + * Adds semantic information about forwarded fields of the user-defined function. + * The forwarded fields information declares fields which are never modified by the function and + * which are forwarded at the same position to the output or unchanged copied to another position in the output. + * </p> + * * <p> - * A constant-set annotation is a set of constant field specifications. The constant field specification String "4->3" specifies, that this UDF copies the fourth field of - * an input tuple to the third field of the output tuple. Field references are zero-indexed. - * + * Fields that are forwarded at the same position are specified by their position. + * The specified position must be valid for the input and output data type and have the same type. + * For example <code>withForwardedFields("f2")</code> declares that the third field of a Java input tuple is + * copied to the third field of an output tuple. + * </p> + * * <p> - * <b>NOTICE: Constant set annotations are optional, but if given need to be correct. Otherwise, the program might produce wrong results!</b> - * - * @param constantSet A list of constant field specification Strings. - * @return This operator with an annotated constant field set. + * Fields which are unchanged copied to another position in the output are declared by specifying the + * source field reference in the input and the target field reference in the output. + * <code>withForwardedFields("f0->f2")</code> denotes that the first field of the Java input tuple is + * unchanged copied to the third field of the Java output tuple. When using a wildcard ("*") ensure that + * the number of declared fields and their types in input and output type match. + * </p> + * + * <p> + * Multiple forwarded fields can be annotated in one (<code>withForwardedFields("f2; f3->f0; f4")</code>) + * or separate Strings (<code>withForwardedFields("f2", "f3->f0", "f4")</code>). + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field references such as nested fields and wildcard. + * </p> + * + * <p> + * It is not possible to override existing semantic information about forwarded fields which was + * for example added by a {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields} class annotation. + * </p> + * + * <p> + * <b>NOTE: Adding semantic information for functions is optional! + * If used correctly, semantic information can help the Flink optimizer to generate more efficient execution plans. + * However, incorrect semantic information can cause the optimizer to generate incorrect execution plans which compute wrong results! + * So be careful when adding semantic information. + * </b> + * </p> + * + * @param forwardedFields A list of field forward expressions. + * @return This operator with annotated forwarded field information. + * + * @see org.apache.flink.api.java.functions.FunctionAnnotation + * @see org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields */ - public O withConstantSet(String... constantSet) { - SingleInputSemanticProperties props = SemanticPropUtil.getSemanticPropsSingleFromString(constantSet, null, null, this.getInputType(), this.getResultType()); - this.setSemanticProperties(props); + public O withForwardedFields(String... forwardedFields) { + + if(this.udfSemantics == null) { + // extract semantic properties from function annotations + this.udfSemantics = extractSemanticAnnotations(getFunction().getClass()); + } + + if(this.udfSemantics == null) { + this.udfSemantics = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, forwardedFields, null, null, this.getInputType(), this.getResultType()); + } else { + if(udfWithForwardedFieldsAnnotation(getFunction().getClass())) { + // refuse semantic information as it would override the function annotation + throw new SemanticProperties.InvalidSemanticAnnotationException("Forwarded field information " + + "has already been added by a function annotation for this operator. " + + "Cannot overwrite function annotations."); + } else { + SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, forwardedFields, null, null, this.getInputType(), this.getResultType()); + } + } + @SuppressWarnings("unchecked") O returnType = (O) this; return returnType; @@ -263,9 +311,9 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp @Override public SingleInputSemanticProperties getSemanticProperties() { - if (udfSemantics == null) { + if (this.udfSemantics == null) { SingleInputSemanticProperties props = extractSemanticAnnotations(getFunction().getClass()); - udfSemantics = props != null ? props : new SingleInputSemanticProperties(); + this.udfSemantics = props != null ? props : new SingleInputSemanticProperties(); } return this.udfSemantics; @@ -284,7 +332,17 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp } protected SingleInputSemanticProperties extractSemanticAnnotations(Class<?> udfClass) { - Set<Annotation> annotations = FunctionAnnotation.readSingleConstantAnnotations(udfClass); + Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(udfClass); return SemanticPropUtil.getSemanticPropsSingle(annotations, getInputType(), getResultType()); } + + protected boolean udfWithForwardedFieldsAnnotation(Class<?> udfClass) { + + if (udfClass.getAnnotation(FunctionAnnotation.ForwardedFields.class) != null || + udfClass.getAnnotation(FunctionAnnotation.NonForwardedFields.class) != null) { + return true; + } else { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java index 2de9282..91f9f7e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.functions.SemanticPropUtil; @@ -107,64 +108,150 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp } /** - * Adds a constant-set annotation for the first input of the UDF. - * * <p> - * Constant set annotations are used by the optimizer to infer the existence of data properties (sorted, partitioned, grouped). - * In certain cases, these annotations allow the optimizer to generate a more efficient execution plan which can lead to improved performance. - * Constant set annotations can only be specified if the first input and the output type of the UDF are of - * {@link org.apache.flink.api.java.tuple.Tuple} data types. - * + * Adds semantic information about forwarded fields of the first input of the user-defined function. + * The forwarded fields information declares fields which are never modified by the function and + * which are forwarded at the same position to the output or unchanged copied to another position in the output. + * </p> + * * <p> - * A constant-set annotation is a set of constant field specifications. The constant field specification String "4->3" specifies, that this UDF copies the fourth field of - * an input tuple to the third field of the output tuple. Field references are zero-indexed. - * + * Fields that are forwarded at the same position are specified by their position. + * The specified position must be valid for the input and output data type and have the same type. + * For example <code>withForwardedFieldsFirst("f2")</code> declares that the third field of a Java input tuple + * from the first input is copied to the third field of an output tuple. + * </p> + * * <p> - * <b>NOTICE: Constant set annotations are optional, but if given need to be correct. Otherwise, the program might produce wrong results!</b> - * - * @param constantSetFirst A list of constant field specification Strings for the first input. - * @return This operator with an annotated constant field set for the first input. + * Fields which are unchanged copied from the first input to another position in the output are declared + * by specifying the source field reference in the first input and the target field reference in the output. + * <code>withForwardedFieldsFirst("f0->f2")</code> denotes that the first field of the first input Java tuple is + * unchanged copied to the third field of the Java output tuple. When using a wildcard ("*") ensure that + * the number of declared fields and their types in first input and output type match. + * </p> + * + * <p> + * Multiple forwarded fields can be annotated in one (<code>withForwardedFieldsFirst("f2; f3->f0; f4")</code>) + * or separate Strings (<code>withForwardedFieldsFirst("f2", "f3->f0", "f4")</code>). + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field references such as nested fields and wildcard. + * </p> + * + * <p> + * It is not possible to override existing semantic information about forwarded fields of the first input which was + * for example added by a {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} class annotation. + * </p> + * + * <p> + * <b>NOTE: Adding semantic information for functions is optional! + * If used correctly, semantic information can help the Flink optimizer to generate more efficient execution plans. + * However, incorrect semantic information can cause the optimizer to generate incorrect execution plans which compute wrong results! + * So be careful when adding semantic information. + * </b> + * </p> + * + * @param forwardedFieldsFirst A list of forwarded field expressions for the first input of the function. + * @return This operator with annotated forwarded field information. + * + * @see org.apache.flink.api.java.functions.FunctionAnnotation + * @see org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst */ @SuppressWarnings("unchecked") - public O withConstantSetFirst(String... constantSetFirst) { + public O withForwardedFieldsFirst(String... forwardedFieldsFirst) { if (this.udfSemantics == null) { + // extract semantic properties from function annotations + this.udfSemantics = extractSemanticAnnotationsFromUdf(getFunction().getClass()); + } + + if(this.udfSemantics == null) { this.udfSemantics = new DualInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, forwardedFieldsFirst, null, + null, null, null, null, getInput1Type(), getInput2Type(), getResultType()); + } else { + if(this.udfWithForwardedFieldsFirstAnnotation(getFunction().getClass())) { + // refuse semantic information as it would override the function annotation + throw new SemanticProperties.InvalidSemanticAnnotationException("Forwarded field information " + + "has already been added by a function annotation for the first input of this operator. " + + "Cannot overwrite function annotations."); + } else { + SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, forwardedFieldsFirst, null, + null, null, null, null, getInput1Type(), getInput2Type(), getResultType()); + } } - - SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, constantSetFirst, null, - null, null, null, null, this.getInput1Type(), this.getInput2Type(), this.getResultType()); O returnType = (O) this; return returnType; } - + /** - * Adds a constant-set annotation for the second input of the UDF. - * * <p> - * Constant set annotations are used by the optimizer to infer the existence of data properties (sorted, partitioned, grouped). - * In certain cases, these annotations allow the optimizer to generate a more efficient execution plan which can lead to improved performance. - * Constant set annotations can only be specified if the second input and the output type of the UDF are of - * {@link org.apache.flink.api.java.tuple.Tuple} data types. - * + * Adds semantic information about forwarded fields of the second input of the user-defined function. + * The forwarded fields information declares fields which are never modified by the function and + * which are forwarded at the same position to the output or unchanged copied to another position in the output. + * </p> + * * <p> - * A constant-set annotation is a set of constant field specifications. The constant field specification String "4->3" specifies, that this UDF copies the fourth field of - * an input tuple to the third field of the output tuple. Field references are zero-indexed. - * + * Fields that are forwarded at the same position are specified by their position. + * The specified position must be valid for the input and output data type and have the same type. + * For example <code>withForwardedFieldsSecond("f2")</code> declares that the third field of a Java input tuple + * from the second input is copied to the third field of an output tuple. + * </p> + * * <p> - * <b>NOTICE: Constant set annotations are optional, but if given need to be correct. Otherwise, the program might produce wrong results!</b> - * - * @param constantSetSecond A list of constant field specification Strings for the second input. - * @return This operator with an annotated constant field set for the second input. + * Fields which are unchanged copied from the second input to another position in the output are declared + * by specifying the source field reference in the second input and the target field reference in the output. + * <code>withForwardedFieldsSecond("f0->f2")</code> denotes that the first field of the second input Java tuple is + * unchanged copied to the third field of the Java output tuple. When using a wildcard ("*") ensure that + * the number of declared fields and their types in second input and output type match. + * </p> + * + * <p> + * Multiple forwarded fields can be annotated in one (<code>withForwardedFieldsSecond("f2; f3->f0; f4")</code>) + * or separate Strings (<code>withForwardedFieldsSecond("f2", "f3->f0", "f4")</code>). + * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * details on field references such as nested fields and wildcard. + * </p> + * + * <p> + * It is not possible to override existing semantic information about forwarded fields of the second input which was + * for example added by a {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} class annotation. + * </p> + * + * <p> + * <b>NOTE: Adding semantic information for functions is optional! + * If used correctly, semantic information can help the Flink optimizer to generate more efficient execution plans. + * However, incorrect semantic information can cause the optimizer to generate incorrect execution plans which compute wrong results! + * So be careful when adding semantic information. + * </b> + * </p> + * + * @param forwardedFieldsSecond A list of forwarded field expressions for the second input of the function. + * @return This operator with annotated forwarded field information. + * + * @see org.apache.flink.api.java.functions.FunctionAnnotation + * @see org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond */ @SuppressWarnings("unchecked") - public O withConstantSetSecond(String... constantSetSecond) { + public O withForwardedFieldsSecond(String... forwardedFieldsSecond) { if (this.udfSemantics == null) { + // extract semantic properties from function annotations + this.udfSemantics = extractSemanticAnnotationsFromUdf(getFunction().getClass()); + } + + if(this.udfSemantics == null) { this.udfSemantics = new DualInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, forwardedFieldsSecond, + null, null, null, null, getInput1Type(), getInput2Type(), getResultType()); + } else { + if(udfWithForwardedFieldsSecondAnnotation(getFunction().getClass())) { + // refuse semantic information as it would override the function annotation + throw new SemanticProperties.InvalidSemanticAnnotationException("Forwarded field information " + + "has already been added by a function annotation for the second input of this operator. " + + "Cannot overwrite function annotations."); + } else { + SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, forwardedFieldsSecond, + null, null, null, null, getInput1Type(), getInput2Type(), getResultType()); + } } - - SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, constantSetSecond, - null, null, null, null, this.getInput1Type(), this.getInput2Type(), this.getResultType()); O returnType = (O) this; return returnType; @@ -303,9 +390,9 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp @Override public DualInputSemanticProperties getSemanticProperties() { - if (udfSemantics == null) { + if (this.udfSemantics == null) { DualInputSemanticProperties props = extractSemanticAnnotationsFromUdf(getFunction().getClass()); - udfSemantics = props != null ? props : new DualInputSemanticProperties(); + this.udfSemantics = props != null ? props : new DualInputSemanticProperties(); } return this.udfSemantics; @@ -325,7 +412,28 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) { - Set<Annotation> annotations = FunctionAnnotation.readDualConstantAnnotations(udfClass); + Set<Annotation> annotations = FunctionAnnotation.readDualForwardAnnotations(udfClass); return SemanticPropUtil.getSemanticPropsDual(annotations, getInput1Type(), getInput2Type(), getResultType()); } + + protected boolean udfWithForwardedFieldsFirstAnnotation(Class<?> udfClass) { + + if (udfClass.getAnnotation(FunctionAnnotation.ForwardedFieldsFirst.class) != null || + udfClass.getAnnotation(FunctionAnnotation.NonForwardedFieldsFirst.class) != null) { + return true; + } else { + return false; + } + } + + protected boolean udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) { + + if (udfClass.getAnnotation(FunctionAnnotation.ForwardedFieldsSecond.class) != null || + udfClass.getAnnotation(FunctionAnnotation.NonForwardedFieldsSecond.class) != null) { + return true; + } else { + return false; + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java index 15b4719..1e99e2d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java @@ -18,11 +18,12 @@ package org.apache.flink.api.java.operators.translation; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; - +@ForwardedFields("*->1") public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java index e139697..e61c0de 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java @@ -19,9 +19,10 @@ package org.apache.flink.api.java.operators.translation; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple2; - +@ForwardedFields("1->*") public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, T>, T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java index 45fb74f..77dbad6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java @@ -23,9 +23,10 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.FilterOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.util.Collector; - +@ForwardedFields("*") public class PlanFilterOperator<T> extends FilterOperatorBase<T, FlatMapFunction<T, T>> { public PlanFilterOperator(FilterFunction<T> udf, String name, TypeInformation<T> type) { http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java index 1e73b92..d7a17b8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java @@ -248,8 +248,7 @@ public class FunctionAnnotation { public @interface ConstantFieldsSecondExcept { int[] value(); } - - + /** * Private constructor to prevent instantiation. This class is intended only as a container. */ @@ -278,8 +277,7 @@ public class FunctionAnnotation { // extract notConstantSet from annotation if (allConstants != null) { - FieldSet nonConstant = new FieldSet(); - return new ImplicitlyForwardingSingleInputSemanticProperties(nonConstant); + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); } SingleInputSemanticProperties semanticProperties = new SingleInputSemanticProperties(); @@ -329,13 +327,13 @@ public class FunctionAnnotation { // extract readSets from annotations if (constantSet1Annotation != null) { for(int value: constantSet1Annotation.value()) { - semanticProperties.addForwardedField1(value, value); + semanticProperties.addForwardedField(0, value, value); } } if (constantSet2Annotation != null) { for(int value: constantSet2Annotation.value()) { - semanticProperties.addForwardedField2(value, value); + semanticProperties.addForwardedField(1, value, value); } } @@ -344,73 +342,66 @@ public class FunctionAnnotation { private static final class ImplicitlyForwardingSingleInputSemanticProperties extends SingleInputSemanticProperties { - private static final long serialVersionUID = 1L; + + private static final long serialVersionUID = 1l; private FieldSet nonForwardedFields; private ImplicitlyForwardingSingleInputSemanticProperties(FieldSet nonForwardedFields) { this.nonForwardedFields = nonForwardedFields; - addWrittenFields(nonForwardedFields); } - - /** - * Returns the logical position where the given field is written to. - * In this variant of the semantic properties, all fields are assumed implicitly forwarded, - * unless stated otherwise. We return the same field position, unless the field is explicitly - * marked as modified. - */ @Override - public FieldSet getForwardedField(int sourceField) { - if (isAllFieldsConstant()) { - return new FieldSet(sourceField); + public FieldSet getForwardingTargetFields(int input, int sourceField) { + + if (input != 0) { + throw new IndexOutOfBoundsException(); } - if (this.nonForwardedFields.contains(sourceField)) { - return null; + if (nonForwardedFields == null) { + return super.getForwardingTargetFields(input, sourceField); } else { - return new FieldSet(sourceField); + if (this.nonForwardedFields.contains(sourceField)) { + return FieldSet.EMPTY_SET; + } else { + return new FieldSet(sourceField); + } } } @Override - public FieldSet getSourceField(int input, int field) { + public int getForwardingSourceField(int input, int targetField) { + if (input != 0) { throw new IndexOutOfBoundsException(); } - if (isAllFieldsConstant()) { - return new FieldSet(field); - } - - if (this.nonForwardedFields == null) { - return super.getSourceField(input, field); - } - - if (this.nonForwardedFields.contains(field)) { - return null; + if (nonForwardedFields == null) { + return super.getForwardingSourceField(input, targetField); } else { - return new FieldSet(field); + if (this.nonForwardedFields.contains(targetField)) { + return -1; + } else { + return targetField; + } } } @Override - public void addForwardedField(int sourceField, int destinationField) { - throw new UnsupportedOperationException("When defining fields as implicitly constant " + - "(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded fields."); + public FieldSet getReadFields(int input) { + return null; } @Override - public void addForwardedField(int sourceField, FieldSet destinationFields) { - throw new UnsupportedOperationException("When defining fields as implicitly constant " + - "(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded fields."); + public void addForwardedField(int sourceField, int destinationField) { + if (this.nonForwardedFields == null) { + super.addForwardedField(sourceField, destinationField); + } else { + throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + + "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); + } } - @Override - public void setForwardedField(int sourceField, FieldSet destinationFields) { - throw new UnsupportedOperationException("When defining fields as implicitly constant " + - "(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded fields."); - } } private static final class ImplicitlyForwardingTwoInputSemanticProperties extends DualInputSemanticProperties { @@ -421,7 +412,6 @@ public class FunctionAnnotation { private ImplicitlyForwardingTwoInputSemanticProperties() {} - public void setImplicitlyForwardingFirstExcept(FieldSet nonForwardedFields) { this.nonForwardedFields1 = nonForwardedFields; } @@ -430,136 +420,63 @@ public class FunctionAnnotation { this.nonForwardedFields2 = nonForwardedFields; } - @Override - public FieldSet getForwardedField1(int sourceField) { - if (isAllFieldsConstant()) { - return new FieldSet(sourceField); - } + public FieldSet getForwardingTargetFields(int input, int sourceField) { - if (this.nonForwardedFields1 == null) { - return super.getForwardedField1(sourceField); - } - else { - if (this.nonForwardedFields1.contains(sourceField)) { - return null; - } else { - return new FieldSet(sourceField); - } - } - } + if(input != 0 && input != 1) { + throw new IndexOutOfBoundsException(); + } else if (input == 0) { - @Override - public FieldSet getForwardedField2(int sourceField) { - if (isAllFieldsConstant()) { - return new FieldSet(sourceField); - } + if (this.nonForwardedFields1 == null) { + return super.getForwardingTargetFields(0, sourceField); + } + else { + if (this.nonForwardedFields1.contains(sourceField)) { + return FieldSet.EMPTY_SET; + } else { + return new FieldSet(sourceField); + } + } + } else { - if (this.nonForwardedFields2 == null) { - return super.getForwardedField2(sourceField); - } - else { - if (this.nonForwardedFields2.contains(sourceField)) { - return null; - } else { - return new FieldSet(sourceField); + if (this.nonForwardedFields2 == null) { + return super.getForwardingTargetFields(1, sourceField); + } + else { + if (this.nonForwardedFields2.contains(sourceField)) { + return FieldSet.EMPTY_SET; + } else { + return new FieldSet(sourceField); + } } } } @Override - public FieldSet getSourceField(int input, int field) { + public void addForwardedField(int input, int sourceField, int destinationField) { if (input != 0 && input != 1) { throw new IndexOutOfBoundsException(); - } - - if (isAllFieldsConstant()) { - return new FieldSet(field); - } - - if (this.nonForwardedFields1 == null && this.nonForwardedFields2 == null) { - return super.getSourceField(input, field); - } - - if (input == 0 && this.nonForwardedFields1 != null && this.nonForwardedFields1.contains(field)) { - return null; } else if (input == 0) { - return new FieldSet(field); - } - - if (input == 1 && this.nonForwardedFields2 != null && this.nonForwardedFields2.contains(field)) { - return null; - } else if (input == 1) { - return new FieldSet(field); - } - - return null; - } - - @Override - public void addForwardedField1(int sourceField, int destinationField) { - if (this.nonForwardedFields1 == null) { - super.addForwardedField1(sourceField, destinationField); - } - else { - throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + - "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); - } - } - - @Override - public void addForwardedField1(int sourceField, FieldSet destinationFields) { - if (this.nonForwardedFields1 == null) { - super.addForwardedField1(sourceField, destinationFields); - } - else { - throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + - "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); - } - } - - @Override - public void setForwardedField1(int sourceField, FieldSet destinationFields) { - if (this.nonForwardedFields1 == null) { - super.addForwardedField1(sourceField, destinationFields); - } - else { - throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + - "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); - } - } - - @Override - public void addForwardedField2(int sourceField, int destinationField) { - if (this.nonForwardedFields2 == null) { - super.addForwardedField2(sourceField, destinationField); - } - else { - throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + - "(such as through the ConstantFieldsSecondExcept annotation), you cannot manually add forwarded fields."); + if (this.nonForwardedFields1 == null) { + super.addForwardedField(0, sourceField, destinationField); + } else { + throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + + "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); + } + } else { + if (this.nonForwardedFields2 == null) { + super.addForwardedField(1, sourceField, destinationField); + } else { + throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + + "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); + } } } @Override - public void addForwardedField2(int sourceField, FieldSet destinationFields) { - if (this.nonForwardedFields2 == null) { - super.addForwardedField2(sourceField, destinationFields); - } - else { - throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + - "(such as through the ConstantFieldsSecondExcept annotation), you cannot manually add forwarded fields."); - } + public FieldSet getReadFields(int input) { + return null; } - @Override - public void setForwardedField2(int sourceField, FieldSet destinationFields) { - if (this.nonForwardedFields2 == null) { - super.addForwardedField2(sourceField, destinationFields); - } - else { - throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + - "(such as through the ConstantFieldsSecondExcept annotation), you cannot manually add forwarded fields."); - } - } } }
