Repository: flink
Updated Branches:
  refs/heads/release-1.2 a504abe46 -> 386bdd299


[FLINK-5348] [core] Add support for custom field names to RowTypeInfo.

This closes #3020


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/386bdd29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/386bdd29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/386bdd29

Branch: refs/heads/release-1.2
Commit: 386bdd299d3df60562d86e36517410fe44a06291
Parents: a504abe
Author: Jark Wu <[email protected]>
Authored: Fri Dec 16 17:24:01 2016 +0800
Committer: twalthr <[email protected]>
Committed: Wed Jan 11 14:08:55 2017 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/RowTypeInfo.java   | 148 +++++++++++++++++++
 .../api/java/typeutils/RowTypeInfoTest.java     |  85 ++++++++++-
 2 files changed, 232 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/386bdd29/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
index 03cbe61..a1b348a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -19,7 +19,9 @@ package org.apache.flink.api.java.typeutils;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.RowComparator;
@@ -29,7 +31,13 @@ import org.apache.flink.types.Row;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -40,6 +48,20 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
 
        private static final long serialVersionUID = 9158518989896601963L;
 
+       private static final String REGEX_INT_FIELD = "[0-9]+";
+       private static final String REGEX_STR_FIELD = 
"[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
+       private static final String REGEX_FIELD = REGEX_STR_FIELD + "|" + 
REGEX_INT_FIELD;
+       private static final String REGEX_NESTED_FIELDS = "(" + REGEX_FIELD + 
")(\\.(.+))?";
+       private static final String REGEX_NESTED_FIELDS_WILDCARD = 
REGEX_NESTED_FIELDS + "|\\"
+                       + ExpressionKeys.SELECT_ALL_CHAR + "|\\"
+                       + ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+
+       private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
+       private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
+       private static final Pattern PATTERN_INT_FIELD = 
Pattern.compile(REGEX_INT_FIELD);
+
+       // 
--------------------------------------------------------------------------------------------
+
        protected final String[] fieldNames;
        /** Temporary variable for directly passing orders to comparators. */
        private boolean[] comparatorOrders = null;
@@ -54,6 +76,122 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
                }
        }
 
+       public RowTypeInfo(TypeInformation<?>[] types, String[] fieldNames) {
+               super(Row.class, types);
+               checkNotNull(fieldNames, "FieldNames should not be null.");
+               checkArgument(
+                       types.length == fieldNames.length,
+                       "Number of field types and names is different.");
+               checkArgument(
+                       !hasDuplicateFieldNames(fieldNames),
+                       "Field names are not unique.");
+
+               this.fieldNames = Arrays.copyOf(fieldNames, fieldNames.length);
+       }
+
+       @Override
+       public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
+               Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+
+               if (!matcher.matches()) {
+                       throw new InvalidFieldReferenceException(
+                               "Invalid tuple field reference \"" + 
fieldExpression + "\".");
+               }
+
+               String field = matcher.group(0);
+
+               if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
+                       (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
+                       // handle select all
+                       int keyPosition = 0;
+                       for (TypeInformation<?> fType : types) {
+                               if (fType instanceof CompositeType) {
+                                       CompositeType<?> cType = 
(CompositeType<?>) fType;
+                                       
cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, 
result);
+                                       keyPosition += cType.getTotalFields() - 
1;
+                               } else {
+                                       result.add(new 
FlatFieldDescriptor(offset + keyPosition, fType));
+                               }
+                               keyPosition++;
+                       }
+               } else {
+                       field = matcher.group(1);
+
+                       Matcher intFieldMatcher = 
PATTERN_INT_FIELD.matcher(field);
+                       int fieldIndex;
+                       if (intFieldMatcher.matches()) {
+                               // field expression is an integer
+                               fieldIndex = Integer.valueOf(field);
+                       } else {
+                               fieldIndex = this.getFieldIndex(field);
+                       }
+                       // fetch the field type will throw exception if the 
index is illegal
+                       TypeInformation<?> fieldType = 
this.getTypeAt(fieldIndex);
+                       // compute the offset,
+                       for (int i = 0; i < fieldIndex; i++) {
+                               offset += this.getTypeAt(i).getTotalFields();
+                       }
+
+                       String tail = matcher.group(3);
+
+                       if (tail == null) {
+                               // expression hasn't nested field
+                               if (fieldType instanceof CompositeType) {
+                                       ((CompositeType) 
fieldType).getFlatFields("*", offset, result);
+                               } else {
+                                       result.add(new 
FlatFieldDescriptor(offset, fieldType));
+                               }
+                       } else {
+                               // expression has nested field
+                               if (fieldType instanceof CompositeType) {
+                                       ((CompositeType) 
fieldType).getFlatFields(tail, offset, result);
+                               } else {
+                                       throw new 
InvalidFieldReferenceException(
+                                               "Nested field expression \"" + 
tail + "\" not possible on atomic type " + fieldType + ".");
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+               Matcher matcher = 
PATTERN_NESTED_FIELDS.matcher(fieldExpression);
+               if (!matcher.matches()) {
+                       if 
(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) ||
+                               
fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+                               throw new 
InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Invalid format of Row field expression 
\""+fieldExpression+"\".");
+                       }
+               }
+
+               String field = matcher.group(1);
+
+               Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
+               int fieldIndex;
+               if (intFieldMatcher.matches()) {
+                       // field expression is an integer
+                       fieldIndex = Integer.valueOf(field);
+               } else {
+                       fieldIndex = this.getFieldIndex(field);
+               }
+               // fetch the field type will throw exception if the index is 
illegal
+               TypeInformation<X> fieldType = this.getTypeAt(fieldIndex);
+
+               String tail = matcher.group(3);
+               if (tail == null) {
+                       // found the type
+                       return fieldType;
+               } else {
+                       if (fieldType instanceof CompositeType) {
+                               return ((CompositeType<?>) 
fieldType).getTypeAt(tail);
+                       } else {
+                               throw new InvalidFieldReferenceException(
+                                       "Nested field expression \""+ tail + 
"\" not possible on atomic type "+fieldType+".");
+                       }
+               }
+       }
+
        @Override
        public TypeComparator<Row> createComparator(
                int[] logicalKeyFields,
@@ -129,6 +267,16 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
                return bld.toString();
        }
 
+       private boolean hasDuplicateFieldNames(String[] fieldNames) {
+               HashSet<String> names = new HashSet<>();
+               for (String field : fieldNames) {
+                       if (!names.add(field)) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
        private class RowTypeComparatorBuilder implements 
TypeComparatorBuilder<Row> {
 
                private final ArrayList<TypeComparator> fieldComparators = new 
ArrayList<TypeComparator>();

http://git-wip-us.apache.org/repos/asf/flink/blob/386bdd29/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
index 8de7bf7..45f616c 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
@@ -18,12 +18,95 @@
 package org.apache.flink.api.java.typeutils;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 public class RowTypeInfoTest {
+       private static TypeInformation<?>[] typeList = new TypeInformation<?>[]{
+               BasicTypeInfo.INT_TYPE_INFO,
+               new RowTypeInfo(
+                       BasicTypeInfo.SHORT_TYPE_INFO,
+                       BasicTypeInfo.BIG_DEC_TYPE_INFO),
+               BasicTypeInfo.STRING_TYPE_INFO
+       };
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testWrongNumberOfFieldNames() {
+               new RowTypeInfo(typeList, new String[]{"int", "string"});
+               // number of field names should be equal to number of types, go 
fail
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDuplicateCustomFieldNames() {
+               new RowTypeInfo(typeList, new String[]{"int", "string", 
"string"});
+               // field names should not be the same, go fail
+       }
+
+       @Test
+       public void testCustomFieldNames() {
+               String[] fieldNames = new String[]{"int", "row", "string"};
+               RowTypeInfo typeInfo1 = new RowTypeInfo(typeList, new 
String[]{"int", "row", "string"});
+               assertArrayEquals(new String[]{"int", "row", "string"}, 
typeInfo1.getFieldNames());
+
+               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
typeInfo1.getTypeAt("string"));
+               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
typeInfo1.getTypeAt(2));
+               assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, 
typeInfo1.getTypeAt("row.0"));
+               assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, 
typeInfo1.getTypeAt("row.f1"));
+
+               // change the names in fieldNames
+               fieldNames[1] = "composite";
+               RowTypeInfo typeInfo2 = new RowTypeInfo(typeList, fieldNames);
+               // make sure the field names are deep copied
+               assertArrayEquals(new String[]{"int", "row", "string"}, 
typeInfo1.getFieldNames());
+               assertArrayEquals(new String[]{"int", "composite", "string"}, 
typeInfo2.getFieldNames());
+       }
+
+       @Test
+       public void testGetFlatFields() {
+               RowTypeInfo typeInfo1 = new RowTypeInfo(typeList, new 
String[]{"int", "row", "string"});
+               List<FlatFieldDescriptor> result = new ArrayList<>();
+               typeInfo1.getFlatFields("row.*", 0, result);
+               assertEquals(2, result.size());
+               assertEquals(
+                       new FlatFieldDescriptor(1, 
BasicTypeInfo.SHORT_TYPE_INFO).toString(),
+                       result.get(0).toString());
+               assertEquals(
+                       new FlatFieldDescriptor(2, 
BasicTypeInfo.BIG_DEC_TYPE_INFO).toString(),
+                       result.get(1).toString());
+
+               result.clear();
+               typeInfo1.getFlatFields("string", 0, result);
+               assertEquals(1, result.size());
+               assertEquals(
+                       new FlatFieldDescriptor(3, 
BasicTypeInfo.STRING_TYPE_INFO).toString(),
+                       result.get(0).toString());
+       }
+
+       @Test
+       public void testGetTypeAt() {
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       new RowTypeInfo(
+                               BasicTypeInfo.SHORT_TYPE_INFO,
+                               BasicTypeInfo.BIG_DEC_TYPE_INFO
+                       ),
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+
+               assertArrayEquals(new String[]{"f0", "f1", "f2"}, 
typeInfo.getFieldNames());
+
+               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
typeInfo.getTypeAt("f2"));
+               assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, 
typeInfo.getTypeAt("f1.f0"));
+               assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, 
typeInfo.getTypeAt("f1.1"));
+       }
 
        @Test
        public void testRowTypeInfoEquality() {
@@ -59,7 +142,7 @@ public class RowTypeInfoTest {
                        BasicTypeInfo.INT_TYPE_INFO,
                        new RowTypeInfo(
                                BasicTypeInfo.SHORT_TYPE_INFO,
-                           BasicTypeInfo.BIG_DEC_TYPE_INFO
+                               BasicTypeInfo.BIG_DEC_TYPE_INFO
                        ),
                        BasicTypeInfo.STRING_TYPE_INFO);
 

Reply via email to