http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 0103a7b..a6cc8bd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -25,9 +25,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; -import org.apache.commons.lang3.Validate; -import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -45,6 +45,15 @@ import com.google.common.base.Joiner; */ public class PojoTypeInfo<T> extends CompositeType<T>{ + private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"; + private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; + private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; + + private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); + private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); + private final Class<T> typeClass; private PojoField[] fields; @@ -103,62 +112,119 @@ public class PojoTypeInfo<T> extends CompositeType<T>{ return Comparable.class.isAssignableFrom(typeClass); } + @Override - public void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { - // handle 'select all' first - if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + if(!matcher.matches()) { + throw new InvalidFieldReferenceException("Invalid POJO field reference \""+fieldExpression+"\"."); + } + + String field = matcher.group(0); + if(field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + // handle select all int keyPosition = 0; - for(PojoField field : fields) { - if(field.type instanceof AtomicType) { - result.add(new FlatFieldDescriptor(offset + keyPosition, field.type)); - } else if(field.type instanceof CompositeType) { - CompositeType<?> cType = (CompositeType<?>)field.type; - cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); + for(PojoField pField : fields) { + if(pField.type instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>)pField.type; + cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); keyPosition += cType.getTotalFields()-1; } else { - throw new RuntimeException("Unexpected key type: "+field.type); + result.add(new NamedFlatFieldDescriptor(pField.field.getName(), offset + keyPosition, pField.type)); } keyPosition++; } return; + } else { + field = matcher.group(1); + } + + // get field + int fieldPos = -1; + TypeInformation<?> fieldType = null; + for (int i = 0; i < fields.length; i++) { + if (fields[i].field.getName().equals(field)) { + fieldPos = i; + fieldType = fields[i].type; + break; + } + } + if (fieldPos == -1) { + throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+"."); } - Validate.notEmpty(fieldExpression, "Field expression must not be empty."); - // if there is a dot try getting the field from that sub field - int firstDot = fieldExpression.indexOf('.'); - if (firstDot == -1) { - // this is the last field (or only field) in the field expression - int fieldId = 0; - for (int i = 0; i < fields.length; i++) { - if(fields[i].type instanceof CompositeType) { - fieldId += fields[i].type.getTotalFields()-1; + String tail = matcher.group(3); + if(tail == null) { + if(fieldType instanceof CompositeType) { + // forward offset + for(int i=0; i<fieldPos; i++) { + offset += this.getTypeAt(i).getTotalFields(); } - if (fields[i].field.getName().equals(fieldExpression)) { - if(fields[i].type instanceof CompositeType) { - throw new IllegalArgumentException("The specified field '"+fieldExpression+"' is refering to a composite type.\n" - + "Either select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' operator or specify a field in the sub-type"); - } - result.add(new FlatFieldDescriptor(offset + fieldId, fields[i].type)); - return; + // add all fields of composite type + ((CompositeType) fieldType).getFlatFields("*", offset, result); + return; + } else { + // we found the field to add + // compute flat field position by adding skipped fields + int flatFieldPos = offset; + for(int i=0; i<fieldPos; i++) { + flatFieldPos += this.getTypeAt(i).getTotalFields(); } - fieldId++; + result.add(new FlatFieldDescriptor(flatFieldPos, fieldType)); + // nothing left to do + return; } } else { - // split and go deeper - String firstField = fieldExpression.substring(0, firstDot); - String rest = fieldExpression.substring(firstDot + 1); - int fieldId = 0; - for (int i = 0; i < fields.length; i++) { - if (fields[i].field.getName().equals(firstField)) { - if (!(fields[i].type instanceof CompositeType<?>)) { - throw new RuntimeException("Field "+fields[i].type+" (specified by '"+fieldExpression+"') is not a composite type"); - } - CompositeType<?> cType = (CompositeType<?>) fields[i].type; - cType.getKey(rest, offset + fieldId, result); // recurse - return; + if(fieldType instanceof CompositeType<?>) { + // forward offset + for(int i=0; i<fieldPos; i++) { + offset += this.getTypeAt(i).getTotalFields(); } - fieldId += fields[i].type.getTotalFields(); + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + // nothing left to do + return; + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); + } + } + } + + public TypeInformation<?> getTypeAt(String fieldExpression) { + + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); + if(!matcher.matches()) { + if (fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + } else { + throw new InvalidFieldReferenceException("Invalid format of POJO field expression \""+fieldExpression+"\"."); + } + } + + String field = matcher.group(1); + // get field + int fieldPos = -1; + TypeInformation<?> fieldType = null; + for (int i = 0; i < fields.length; i++) { + if (fields[i].field.getName().equals(field)) { + fieldPos = i; + fieldType = fields[i].type; + break; + } + } + if (fieldPos == -1) { + throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+"."); + } + + String tail = matcher.group(3); + if(tail == null) { + // we found the type + return fieldType; + } else { + if(fieldType instanceof CompositeType<?>) { + return ((CompositeType) fieldType).getTypeAt(tail); + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); } - throw new RuntimeException("Unable to find field "+fieldExpression+" in type "+this+" (looking for '"+firstField+"')"); } } @@ -248,4 +314,23 @@ public class PojoTypeInfo<T> extends CompositeType<T>{ + ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]" + ">"; } + + public static class NamedFlatFieldDescriptor extends FlatFieldDescriptor { + + private String fieldName; + + public NamedFlatFieldDescriptor(String name, int keyPosition, TypeInformation<?> type) { + super(keyPosition, type); + this.fieldName = name; + } + + public String getFieldName() { + return fieldName; + } + + @Override + public String toString() { + return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; + } + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index 22f7942..effec2b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -20,23 +20,32 @@ package org.apache.flink.api.java.typeutils; import java.util.Arrays; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -import com.google.common.base.Preconditions; - public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { - + + private final static String REGEX_FIELD = "(f?)([0-9]+)"; + private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; + private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; + + private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_FIELD); + private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); + private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); + protected final TypeInformation<?>[] types; protected final Class<T> tupleType; private int totalFields; - + public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) { super(tupleType); this.tupleType = tupleType; @@ -90,82 +99,114 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { startKeyId += type.getTotalFields(); } } - @Override - public void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { - // handle 'select all' - if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + if (!matcher.matches()) { + throw new InvalidFieldReferenceException("Invalid tuple field reference \""+fieldExpression+"\"."); + } + + String field = matcher.group(0); + if (field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + // handle select all int keyPosition = 0; - for(TypeInformation<?> type : types) { - if(type instanceof AtomicType) { - result.add(new FlatFieldDescriptor(offset + keyPosition, type)); - } else if(type instanceof CompositeType) { - CompositeType<?> cType = (CompositeType<?>)type; - cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); - keyPosition += cType.getTotalFields()-1; + for (TypeInformation<?> type : types) { + if (type instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) type; + cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; } else { - throw new RuntimeException("Unexpected key type: "+type); + result.add(new FlatFieldDescriptor(offset + keyPosition, type)); } keyPosition++; } return; } - // check input - if(fieldExpression.length() < 2) { - throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect. The length must be at least 2"); - } - if(fieldExpression.charAt(0) != 'f') { - throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect for a Tuple type. It has to start with an 'f'"); - } - // get first component of nested expression - int dotPos = fieldExpression.indexOf('.'); - String nestedSplitFirst = fieldExpression; - if(dotPos != -1 ) { - Preconditions.checkArgument(dotPos != fieldExpression.length()-1, "The field expression can never end with a dot."); - nestedSplitFirst = fieldExpression.substring(0, dotPos); - } - String fieldNumStr = nestedSplitFirst.substring(1, nestedSplitFirst.length()); - if(!StringUtils.isNumeric(fieldNumStr)) { - throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect. Field number '"+fieldNumStr+" is not numeric"); - } - int pos = -1; - try { - pos = Integer.valueOf(fieldNumStr); - } catch(NumberFormatException nfe) { - throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect. Field number '"+fieldNumStr+" is not numeric", nfe); + + String fieldStr = matcher.group(1); + Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr); + if (!fieldMatcher.matches()) { + throw new RuntimeException("Invalid matcher pattern"); } - if(pos < 0) { - throw new IllegalArgumentException("Negative position is not possible"); + field = fieldMatcher.group(2); + int fieldPos = Integer.valueOf(field); + + if (fieldPos >= this.getArity()) { + throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + "."); } - // pass down the remainder (after the dot) of the fieldExpression to the type at that position. - if(dotPos != -1) { // we need to go deeper - String rem = fieldExpression.substring(dotPos+1); - if( !(types[pos] instanceof CompositeType<?>) ) { - throw new RuntimeException("Element at position "+pos+" is not a composite type. There are no nested types to select"); + TypeInformation<?> fieldType = this.getTypeAt(fieldPos); + String tail = matcher.group(5); + if(tail == null) { + if(fieldType instanceof CompositeType) { + // forward offsets + for(int i=0; i<fieldPos; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + // add all fields of composite type + ((CompositeType) fieldType).getFlatFields("*", offset, result); + return; + } else { + // we found the field to add + // compute flat field position by adding skipped fields + int flatFieldPos = offset; + for(int i=0; i<fieldPos; i++) { + flatFieldPos += this.getTypeAt(i).getTotalFields(); + } + result.add(new FlatFieldDescriptor(flatFieldPos, fieldType)); + // nothing left to do + return; } - CompositeType<?> cType = (CompositeType<?>) types[pos]; - // count nested fields before "pos" - for (int i = 0; i < pos; i++) { - offset += types[i].getTotalFields(); + } else { + if(fieldType instanceof CompositeType<?>) { + // forward offset + for(int i=0; i<fieldPos; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + // nothing left to do + return; + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); } - cType.getKey(rem, offset, result); - return; } - - if(pos >= types.length) { - throw new IllegalArgumentException("The specified tuple position does not exist"); + } + + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { + + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); + if(!matcher.matches()) { + if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + } else { + throw new InvalidFieldReferenceException("Invalid format of tuple field expression \""+fieldExpression+"\"."); + } } - - // count nested fields before "pos". - for(int i = 0; i < pos; i++) { - offset += types[i].getTotalFields() - 1; // this adds only something to offset if its a composite type. + + String fieldStr = matcher.group(1); + Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr); + if(!fieldMatcher.matches()) { + throw new RuntimeException("Invalid matcher pattern"); } - if(types[pos] instanceof CompositeType) { - throw new IllegalArgumentException("The specified field '"+fieldExpression+"' is refering to a composite type.\n" - + "Either select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' operator or specify a field in the sub-type"); + String field = fieldMatcher.group(2); + int fieldPos = Integer.valueOf(field); + + if(fieldPos >= this.getArity()) { + throw new InvalidFieldReferenceException("Tuple field expression \""+fieldStr+"\" out of bounds of "+this.toString()+"."); + } + TypeInformation<X> fieldType = this.getTypeAt(fieldPos); + String tail = matcher.group(5); + if(tail == null) { + // we found the type + return fieldType; + } else { + if(fieldType instanceof CompositeType<?>) { + return ((CompositeType) fieldType).getTypeAt(tail); + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); + } } - result.add(new FlatFieldDescriptor(offset + pos, types[pos])); } public <X> TypeInformation<X> getTypeAt(int pos) {
