http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 0103a7b..a6cc8bd 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -25,9 +25,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
-import org.apache.commons.lang3.Validate;
-import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -45,6 +45,15 @@ import com.google.common.base.Joiner;
  */
 public class PojoTypeInfo<T> extends CompositeType<T>{
 
+       private final static String REGEX_FIELD = 
"[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
+       private final static String REGEX_NESTED_FIELDS = 
"("+REGEX_FIELD+")(\\.(.+))?";
+       private final static String REGEX_NESTED_FIELDS_WILDCARD = 
REGEX_NESTED_FIELDS
+                       +"|\\"+ExpressionKeys.SELECT_ALL_CHAR
+                       +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+
+       private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
+       private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
+
        private final Class<T> typeClass;
 
        private PojoField[] fields;
@@ -103,62 +112,119 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
                return Comparable.class.isAssignableFrom(typeClass);
        }
        
+
        @Override
-       public void getKey(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
-               // handle 'select all' first
-               if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+       public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
+
+               Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+               if(!matcher.matches()) {
+                       throw new InvalidFieldReferenceException("Invalid POJO 
field reference \""+fieldExpression+"\".");
+               }
+
+               String field = matcher.group(0);
+               if(field.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+                       // handle select all
                        int keyPosition = 0;
-                       for(PojoField field : fields) {
-                               if(field.type instanceof AtomicType) {
-                                       result.add(new 
FlatFieldDescriptor(offset + keyPosition, field.type));
-                               } else if(field.type instanceof CompositeType) {
-                                       CompositeType<?> cType = 
(CompositeType<?>)field.type;
-                                       
cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + 
keyPosition, result);
+                       for(PojoField pField : fields) {
+                               if(pField.type instanceof CompositeType) {
+                                       CompositeType<?> cType = 
(CompositeType<?>)pField.type;
+                                       
cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + 
keyPosition, result);
                                        keyPosition += cType.getTotalFields()-1;
                                } else {
-                                       throw new RuntimeException("Unexpected 
key type: "+field.type);
+                                       result.add(new 
NamedFlatFieldDescriptor(pField.field.getName(), offset + keyPosition, 
pField.type));
                                }
                                keyPosition++;
                        }
                        return;
+               } else {
+                       field = matcher.group(1);
+               }
+
+               // get field
+               int fieldPos = -1;
+               TypeInformation<?> fieldType = null;
+               for (int i = 0; i < fields.length; i++) {
+                       if (fields[i].field.getName().equals(field)) {
+                               fieldPos = i;
+                               fieldType = fields[i].type;
+                               break;
+                       }
+               }
+               if (fieldPos == -1) {
+                       throw new InvalidFieldReferenceException("Unable to 
find field \""+field+"\" in type "+this+".");
                }
-               Validate.notEmpty(fieldExpression, "Field expression must not 
be empty.");
-               // if there is a dot try getting the field from that sub field
-               int firstDot = fieldExpression.indexOf('.');
-               if (firstDot == -1) {
-                       // this is the last field (or only field) in the field 
expression
-                       int fieldId = 0;
-                       for (int i = 0; i < fields.length; i++) {
-                               if(fields[i].type instanceof CompositeType) {
-                                       fieldId += 
fields[i].type.getTotalFields()-1;
+               String tail = matcher.group(3);
+               if(tail == null) {
+                       if(fieldType instanceof CompositeType) {
+                               // forward offset
+                               for(int i=0; i<fieldPos; i++) {
+                                       offset += 
this.getTypeAt(i).getTotalFields();
                                }
-                               if 
(fields[i].field.getName().equals(fieldExpression)) {
-                                       if(fields[i].type instanceof 
CompositeType) {
-                                               throw new 
IllegalArgumentException("The specified field '"+fieldExpression+"' is refering 
to a composite type.\n"
-                                                               + "Either 
select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' 
operator or specify a field in the sub-type");
-                                       }
-                                       result.add(new 
FlatFieldDescriptor(offset + fieldId, fields[i].type));
-                                       return;
+                               // add all fields of composite type
+                               ((CompositeType) fieldType).getFlatFields("*", 
offset, result);
+                               return;
+                       } else {
+                               // we found the field to add
+                               // compute flat field position by adding 
skipped fields
+                               int flatFieldPos = offset;
+                               for(int i=0; i<fieldPos; i++) {
+                                       flatFieldPos += 
this.getTypeAt(i).getTotalFields();
                                }
-                               fieldId++;
+                               result.add(new 
FlatFieldDescriptor(flatFieldPos, fieldType));
+                               // nothing left to do
+                               return;
                        }
                } else {
-                       // split and go deeper
-                       String firstField = fieldExpression.substring(0, 
firstDot);
-                       String rest = fieldExpression.substring(firstDot + 1);
-                       int fieldId = 0;
-                       for (int i = 0; i < fields.length; i++) {
-                               if 
(fields[i].field.getName().equals(firstField)) {
-                                       if (!(fields[i].type instanceof 
CompositeType<?>)) {
-                                               throw new 
RuntimeException("Field "+fields[i].type+" (specified by '"+fieldExpression+"') 
is not a composite type");
-                                       }
-                                       CompositeType<?> cType = 
(CompositeType<?>) fields[i].type;
-                                       cType.getKey(rest, offset + fieldId, 
result); // recurse
-                                       return;
+                       if(fieldType instanceof CompositeType<?>) {
+                               // forward offset
+                               for(int i=0; i<fieldPos; i++) {
+                                       offset += 
this.getTypeAt(i).getTotalFields();
                                }
-                               fieldId += fields[i].type.getTotalFields();
+                               ((CompositeType) fieldType).getFlatFields(tail, 
offset, result);
+                               // nothing left to do
+                               return;
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
+                       }
+               }
+       }
+
+       public TypeInformation<?> getTypeAt(String fieldExpression) {
+
+               Matcher matcher = 
PATTERN_NESTED_FIELDS.matcher(fieldExpression);
+               if(!matcher.matches()) {
+                       if 
(fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) || 
fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+                               throw new 
InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Invalid format of POJO field expression 
\""+fieldExpression+"\".");
+                       }
+               }
+
+               String field = matcher.group(1);
+               // get field
+               int fieldPos = -1;
+               TypeInformation<?> fieldType = null;
+               for (int i = 0; i < fields.length; i++) {
+                       if (fields[i].field.getName().equals(field)) {
+                               fieldPos = i;
+                               fieldType = fields[i].type;
+                               break;
+                       }
+               }
+               if (fieldPos == -1) {
+                       throw new InvalidFieldReferenceException("Unable to 
find field \""+field+"\" in type "+this+".");
+               }
+
+               String tail = matcher.group(3);
+               if(tail == null) {
+                       // we found the type
+                       return fieldType;
+               } else {
+                       if(fieldType instanceof CompositeType<?>) {
+                               return ((CompositeType) 
fieldType).getTypeAt(tail);
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
                        }
-                       throw new RuntimeException("Unable to find field 
"+fieldExpression+" in type "+this+" (looking for '"+firstField+"')");
                }
        }
 
@@ -248,4 +314,23 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
                                + ", fields = [" + Joiner.on(", 
").join(fieldStrings) + "]"
                                + ">";
        }
+
+       public static class NamedFlatFieldDescriptor extends 
FlatFieldDescriptor {
+
+               private String fieldName;
+
+               public NamedFlatFieldDescriptor(String name, int keyPosition, 
TypeInformation<?> type) {
+                       super(keyPosition, type);
+                       this.fieldName = name;
+               }
+
+               public String getFieldName() {
+                       return fieldName;
+               }
+
+               @Override
+               public String toString() {
+                       return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 22f7942..effec2b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -20,23 +20,32 @@ package org.apache.flink.api.java.typeutils;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 
-import com.google.common.base.Preconditions;
-
 public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
-       
+
+       private final static String REGEX_FIELD = "(f?)([0-9]+)";
+       private final static String REGEX_NESTED_FIELDS = 
"("+REGEX_FIELD+")(\\.(.+))?";
+       private final static String REGEX_NESTED_FIELDS_WILDCARD = 
REGEX_NESTED_FIELDS
+                       +"|\\"+ExpressionKeys.SELECT_ALL_CHAR
+                       +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+
+       private static final Pattern PATTERN_FIELD = 
Pattern.compile(REGEX_FIELD);
+       private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
+       private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
+
        protected final TypeInformation<?>[] types;
        
        protected final Class<T> tupleType;
 
        private int totalFields;
-       
+
        public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... 
types) {
                super(tupleType);
                this.tupleType = tupleType;
@@ -90,82 +99,114 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
                        startKeyId += type.getTotalFields();
                }
        }
-       
 
        @Override
-       public void getKey(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
-               // handle 'select all'
-               if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+       public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
+
+               Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+               if (!matcher.matches()) {
+                       throw new InvalidFieldReferenceException("Invalid tuple 
field reference \""+fieldExpression+"\".");
+               }
+
+               String field = matcher.group(0);
+               if (field.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+                       // handle select all
                        int keyPosition = 0;
-                       for(TypeInformation<?> type : types) {
-                               if(type instanceof AtomicType) {
-                                       result.add(new 
FlatFieldDescriptor(offset + keyPosition, type));
-                               } else if(type instanceof CompositeType) {
-                                       CompositeType<?> cType = 
(CompositeType<?>)type;
-                                       
cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + 
keyPosition, result);
-                                       keyPosition += cType.getTotalFields()-1;
+                       for (TypeInformation<?> type : types) {
+                               if (type instanceof CompositeType) {
+                                       CompositeType<?> cType = 
(CompositeType<?>) type;
+                                       
cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + 
keyPosition, result);
+                                       keyPosition += cType.getTotalFields() - 
1;
                                } else {
-                                       throw new RuntimeException("Unexpected 
key type: "+type);
+                                       result.add(new 
FlatFieldDescriptor(offset + keyPosition, type));
                                }
                                keyPosition++;
                        }
                        return;
                }
-               // check input
-               if(fieldExpression.length() < 2) {
-                       throw new IllegalArgumentException("The field 
expression '"+fieldExpression+"' is incorrect. The length must be at least 2");
-               }
-               if(fieldExpression.charAt(0) != 'f') {
-                       throw new IllegalArgumentException("The field 
expression '"+fieldExpression+"' is incorrect for a Tuple type. It has to start 
with an 'f'");
-               }
-               // get first component of nested expression
-               int dotPos = fieldExpression.indexOf('.');
-               String nestedSplitFirst = fieldExpression;
-               if(dotPos != -1 ) {
-                       Preconditions.checkArgument(dotPos != 
fieldExpression.length()-1, "The field expression can never end with a dot.");
-                       nestedSplitFirst = fieldExpression.substring(0, dotPos);
-               }
-               String fieldNumStr = nestedSplitFirst.substring(1, 
nestedSplitFirst.length());
-               if(!StringUtils.isNumeric(fieldNumStr)) {
-                       throw new IllegalArgumentException("The field 
expression '"+fieldExpression+"' is incorrect. Field number '"+fieldNumStr+" is 
not numeric");
-               }
-               int pos = -1;
-               try {
-                       pos = Integer.valueOf(fieldNumStr);
-               } catch(NumberFormatException nfe) {
-                       throw new IllegalArgumentException("The field 
expression '"+fieldExpression+"' is incorrect. Field number '"+fieldNumStr+" is 
not numeric", nfe);
+
+               String fieldStr = matcher.group(1);
+               Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
+               if (!fieldMatcher.matches()) {
+                       throw new RuntimeException("Invalid matcher pattern");
                }
-               if(pos < 0) {
-                       throw new IllegalArgumentException("Negative position 
is not possible");
+               field = fieldMatcher.group(2);
+               int fieldPos = Integer.valueOf(field);
+
+               if (fieldPos >= this.getArity()) {
+                       throw new InvalidFieldReferenceException("Tuple field 
expression \"" + fieldStr + "\" out of bounds of " + this.toString() + ".");
                }
-               // pass down the remainder (after the dot) of the 
fieldExpression to the type at that position.
-               if(dotPos != -1) { // we need to go deeper
-                       String rem = fieldExpression.substring(dotPos+1);
-                       if( !(types[pos] instanceof CompositeType<?>) ) {
-                               throw new RuntimeException("Element at position 
"+pos+" is not a composite type. There are no nested types to select");
+               TypeInformation<?> fieldType = this.getTypeAt(fieldPos);
+               String tail = matcher.group(5);
+               if(tail == null) {
+                       if(fieldType instanceof CompositeType) {
+                               // forward offsets
+                               for(int i=0; i<fieldPos; i++) {
+                                       offset += 
this.getTypeAt(i).getTotalFields();
+                               }
+                               // add all fields of composite type
+                               ((CompositeType) fieldType).getFlatFields("*", 
offset, result);
+                               return;
+                       } else {
+                               // we found the field to add
+                               // compute flat field position by adding 
skipped fields
+                               int flatFieldPos = offset;
+                               for(int i=0; i<fieldPos; i++) {
+                                       flatFieldPos += 
this.getTypeAt(i).getTotalFields();
+                               }
+                               result.add(new 
FlatFieldDescriptor(flatFieldPos, fieldType));
+                               // nothing left to do
+                               return;
                        }
-                       CompositeType<?> cType = (CompositeType<?>) types[pos];
-                       // count nested fields before "pos"
-                       for (int i = 0; i < pos; i++) {
-                               offset += types[i].getTotalFields();
+               } else {
+                       if(fieldType instanceof CompositeType<?>) {
+                               // forward offset
+                               for(int i=0; i<fieldPos; i++) {
+                                       offset += 
this.getTypeAt(i).getTotalFields();
+                               }
+                               ((CompositeType) fieldType).getFlatFields(tail, 
offset, result);
+                               // nothing left to do
+                               return;
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
                        }
-                       cType.getKey(rem, offset, result);
-                       return;
                }
-               
-               if(pos >= types.length) {
-                       throw new IllegalArgumentException("The specified tuple 
position does not exist");
+       }
+
+       public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+
+               Matcher matcher = 
PATTERN_NESTED_FIELDS.matcher(fieldExpression);
+               if(!matcher.matches()) {
+                       if 
(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+                               throw new 
InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Invalid format of tuple field expression 
\""+fieldExpression+"\".");
+                       }
                }
-               
-               // count nested fields before "pos".
-               for(int i = 0; i < pos; i++) {
-                       offset += types[i].getTotalFields() - 1; // this adds 
only something to offset if its a composite type.
+
+               String fieldStr = matcher.group(1);
+               Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
+               if(!fieldMatcher.matches()) {
+                       throw new RuntimeException("Invalid matcher pattern");
                }
-               if(types[pos] instanceof CompositeType) {
-                       throw new IllegalArgumentException("The specified field 
'"+fieldExpression+"' is refering to a composite type.\n"
-                                       + "Either select all elements in this 
type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' operator or specify a field 
in the sub-type");
+               String field = fieldMatcher.group(2);
+               int fieldPos = Integer.valueOf(field);
+
+               if(fieldPos >= this.getArity()) {
+                       throw new InvalidFieldReferenceException("Tuple field 
expression \""+fieldStr+"\" out of bounds of "+this.toString()+".");
+               }
+               TypeInformation<X> fieldType = this.getTypeAt(fieldPos);
+               String tail = matcher.group(5);
+               if(tail == null) {
+                       // we found the type
+                       return fieldType;
+               } else {
+                       if(fieldType instanceof CompositeType<?>) {
+                               return ((CompositeType) 
fieldType).getTypeAt(tail);
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
+                       }
                }
-               result.add(new FlatFieldDescriptor(offset + pos, types[pos]));
        }
        
        public <X> TypeInformation<X> getTypeAt(int pos) {

Reply via email to