[FLINK-5358] [types] Add support to extract RowTypeInfo from Row instance. This closes #3027.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2af939a1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2af939a1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2af939a1 Branch: refs/heads/master Commit: 2af939a10288348eedb56fc0959daee77c9cdcf3 Parents: 38ded2b Author: tonycox <[email protected]> Authored: Fri Dec 16 20:55:40 2016 +0400 Committer: Fabian Hueske <[email protected]> Committed: Tue Jan 10 20:56:28 2017 +0100 ---------------------------------------------------------------------- .../flink/api/java/typeutils/TypeExtractor.java | 17 +++++++++++++++++ .../api/java/typeutils/TypeExtractorTest.java | 20 +++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2af939a1/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index df4a0e0..8bf2867 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -49,6 +49,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable; +import org.apache.flink.types.Row; import org.apache.flink.types.Value; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -1894,6 +1895,22 @@ public class TypeExtractor { } return new TupleTypeInfo(value.getClass(), infos); } + else if (value instanceof Row) { + Row row = (Row) value; + int arity = row.getArity(); + for (int i = 0; i < arity; i++) { + if (row.getField(i) == null) { + LOG.warn("Cannot extract type of Row field, because of Row field[" + i + "] is null. " + + "Should define RowTypeInfo explicitly."); + return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>()); + } + } + TypeInformation<?>[] typeArray = new TypeInformation<?>[arity]; + for (int i = 0; i < arity; i++) { + typeArray[i] = TypeExtractor.getForObject(row.getField(i)); + } + return (TypeInformation<X>) new RowTypeInfo(typeArray); + } else { return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>()); } http://git-wip-us.apache.org/repos/asf/flink/blob/2af939a1/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java index 55cd42d..804cf88 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -61,6 +61,7 @@ import org.apache.flink.types.DoubleValue; import org.apache.flink.types.IntValue; import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; +import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.junit.Assert; @@ -345,8 +346,25 @@ public class TypeExtractorTest { Assert.assertFalse(TypeExtractor.getForClass(PojoWithNonPublicDefaultCtor.class) instanceof PojoTypeInfo); } - + @Test + public void testRow() { + Row row = new Row(2); + row.setField(0, "string"); + row.setField(1, 15); + TypeInformation<Row> rowInfo = TypeExtractor.getForObject(row); + Assert.assertEquals(rowInfo.getClass(), RowTypeInfo.class); + Assert.assertEquals(2, rowInfo.getArity()); + Assert.assertEquals( + new RowTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO), + rowInfo); + + Row nullRow = new Row(2); + TypeInformation<Row> genericRowInfo = TypeExtractor.getForObject(nullRow); + Assert.assertEquals(genericRowInfo, new GenericTypeInfo<>(Row.class)); + } public static class CustomType { public String myField1;
