Repository: flink Updated Branches: refs/heads/release-1.2 a504abe46 -> 386bdd299
[FLINK-5348] [core] Add support for custom field names to RowTypeInfo. This closes #3020 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/386bdd29 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/386bdd29 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/386bdd29 Branch: refs/heads/release-1.2 Commit: 386bdd299d3df60562d86e36517410fe44a06291 Parents: a504abe Author: Jark Wu <[email protected]> Authored: Fri Dec 16 17:24:01 2016 +0800 Committer: twalthr <[email protected]> Committed: Wed Jan 11 14:08:55 2017 +0100 ---------------------------------------------------------------------- .../flink/api/java/typeutils/RowTypeInfo.java | 148 +++++++++++++++++++ .../api/java/typeutils/RowTypeInfoTest.java | 85 ++++++++++- 2 files changed, 232 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/386bdd29/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java index 03cbe61..a1b348a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java @@ -19,7 +19,9 @@ package org.apache.flink.api.java.typeutils; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.RowComparator; @@ -29,7 +31,13 @@ import org.apache.flink.types.Row; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** @@ -40,6 +48,20 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> { private static final long serialVersionUID = 9158518989896601963L; + private static final String REGEX_INT_FIELD = "[0-9]+"; + private static final String REGEX_STR_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"; + private static final String REGEX_FIELD = REGEX_STR_FIELD + "|" + REGEX_INT_FIELD; + private static final String REGEX_NESTED_FIELDS = "(" + REGEX_FIELD + ")(\\.(.+))?"; + private static final String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + "|\\" + + ExpressionKeys.SELECT_ALL_CHAR + "|\\" + + ExpressionKeys.SELECT_ALL_CHAR_SCALA; + + private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); + private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); + private static final Pattern PATTERN_INT_FIELD = Pattern.compile(REGEX_INT_FIELD); + + // -------------------------------------------------------------------------------------------- + protected final String[] fieldNames; /** Temporary variable for directly passing orders to comparators. */ private boolean[] comparatorOrders = null; @@ -54,6 +76,122 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> { } } + public RowTypeInfo(TypeInformation<?>[] types, String[] fieldNames) { + super(Row.class, types); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.length == fieldNames.length, + "Number of field types and names is different."); + checkArgument( + !hasDuplicateFieldNames(fieldNames), + "Field names are not unique."); + + this.fieldNames = Arrays.copyOf(fieldNames, fieldNames.length); + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + + if (!matcher.matches()) { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + } + + String field = matcher.group(0); + + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) || + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> fType : types) { + if (fType instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + } + keyPosition++; + } + } else { + field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + int fieldIndex; + if (intFieldMatcher.matches()) { + // field expression is an integer + fieldIndex = Integer.valueOf(field); + } else { + fieldIndex = this.getFieldIndex(field); + } + // fetch the field type will throw exception if the index is illegal + TypeInformation<?> fieldType = this.getTypeAt(fieldIndex); + // compute the offset, + for (int i = 0; i < fieldIndex; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + + String tail = matcher.group(3); + + if (tail == null) { + // expression hasn't nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + } else { + result.add(new FlatFieldDescriptor(offset, fieldType)); + } + } else { + // expression has nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + } else { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + } + } + } + } + + @Override + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); + if (!matcher.matches()) { + if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || + fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + } else { + throw new InvalidFieldReferenceException("Invalid format of Row field expression \""+fieldExpression+"\"."); + } + } + + String field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + int fieldIndex; + if (intFieldMatcher.matches()) { + // field expression is an integer + fieldIndex = Integer.valueOf(field); + } else { + fieldIndex = this.getFieldIndex(field); + } + // fetch the field type will throw exception if the index is illegal + TypeInformation<X> fieldType = this.getTypeAt(fieldIndex); + + String tail = matcher.group(3); + if (tail == null) { + // found the type + return fieldType; + } else { + if (fieldType instanceof CompositeType) { + return ((CompositeType<?>) fieldType).getTypeAt(tail); + } else { + throw new InvalidFieldReferenceException( + "Nested field expression \""+ tail + "\" not possible on atomic type "+fieldType+"."); + } + } + } + @Override public TypeComparator<Row> createComparator( int[] logicalKeyFields, @@ -129,6 +267,16 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> { return bld.toString(); } + private boolean hasDuplicateFieldNames(String[] fieldNames) { + HashSet<String> names = new HashSet<>(); + for (String field : fieldNames) { + if (!names.add(field)) { + return true; + } + } + return false; + } + private class RowTypeComparatorBuilder implements TypeComparatorBuilder<Row> { private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>(); http://git-wip-us.apache.org/repos/asf/flink/blob/386bdd29/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java index 8de7bf7..45f616c 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java @@ -18,12 +18,95 @@ package org.apache.flink.api.java.typeutils; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; public class RowTypeInfoTest { + private static TypeInformation<?>[] typeList = new TypeInformation<?>[]{ + BasicTypeInfo.INT_TYPE_INFO, + new RowTypeInfo( + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.BIG_DEC_TYPE_INFO), + BasicTypeInfo.STRING_TYPE_INFO + }; + + @Test(expected = IllegalArgumentException.class) + public void testWrongNumberOfFieldNames() { + new RowTypeInfo(typeList, new String[]{"int", "string"}); + // number of field names should be equal to number of types, go fail + } + + @Test(expected = IllegalArgumentException.class) + public void testDuplicateCustomFieldNames() { + new RowTypeInfo(typeList, new String[]{"int", "string", "string"}); + // field names should not be the same, go fail + } + + @Test + public void testCustomFieldNames() { + String[] fieldNames = new String[]{"int", "row", "string"}; + RowTypeInfo typeInfo1 = new RowTypeInfo(typeList, new String[]{"int", "row", "string"}); + assertArrayEquals(new String[]{"int", "row", "string"}, typeInfo1.getFieldNames()); + + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, typeInfo1.getTypeAt("string")); + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, typeInfo1.getTypeAt(2)); + assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, typeInfo1.getTypeAt("row.0")); + assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, typeInfo1.getTypeAt("row.f1")); + + // change the names in fieldNames + fieldNames[1] = "composite"; + RowTypeInfo typeInfo2 = new RowTypeInfo(typeList, fieldNames); + // make sure the field names are deep copied + assertArrayEquals(new String[]{"int", "row", "string"}, typeInfo1.getFieldNames()); + assertArrayEquals(new String[]{"int", "composite", "string"}, typeInfo2.getFieldNames()); + } + + @Test + public void testGetFlatFields() { + RowTypeInfo typeInfo1 = new RowTypeInfo(typeList, new String[]{"int", "row", "string"}); + List<FlatFieldDescriptor> result = new ArrayList<>(); + typeInfo1.getFlatFields("row.*", 0, result); + assertEquals(2, result.size()); + assertEquals( + new FlatFieldDescriptor(1, BasicTypeInfo.SHORT_TYPE_INFO).toString(), + result.get(0).toString()); + assertEquals( + new FlatFieldDescriptor(2, BasicTypeInfo.BIG_DEC_TYPE_INFO).toString(), + result.get(1).toString()); + + result.clear(); + typeInfo1.getFlatFields("string", 0, result); + assertEquals(1, result.size()); + assertEquals( + new FlatFieldDescriptor(3, BasicTypeInfo.STRING_TYPE_INFO).toString(), + result.get(0).toString()); + } + + @Test + public void testGetTypeAt() { + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + new RowTypeInfo( + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.BIG_DEC_TYPE_INFO + ), + BasicTypeInfo.STRING_TYPE_INFO); + + + assertArrayEquals(new String[]{"f0", "f1", "f2"}, typeInfo.getFieldNames()); + + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, typeInfo.getTypeAt("f2")); + assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, typeInfo.getTypeAt("f1.f0")); + assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, typeInfo.getTypeAt("f1.1")); + } @Test public void testRowTypeInfoEquality() { @@ -59,7 +142,7 @@ public class RowTypeInfoTest { BasicTypeInfo.INT_TYPE_INFO, new RowTypeInfo( BasicTypeInfo.SHORT_TYPE_INFO, - BasicTypeInfo.BIG_DEC_TYPE_INFO + BasicTypeInfo.BIG_DEC_TYPE_INFO ), BasicTypeInfo.STRING_TYPE_INFO);
