[FLINK-5358] [types] Add support to extract RowTypeInfo from Row instance.

This closes #3027.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2af939a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2af939a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2af939a1

Branch: refs/heads/master
Commit: 2af939a10288348eedb56fc0959daee77c9cdcf3
Parents: 38ded2b
Author: tonycox <[email protected]>
Authored: Fri Dec 16 20:55:40 2016 +0400
Committer: Fabian Hueske <[email protected]>
Committed: Tue Jan 10 20:56:28 2017 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 17 +++++++++++++++++
 .../api/java/typeutils/TypeExtractorTest.java   | 20 +++++++++++++++++++-
 2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2af939a1/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index df4a0e0..8bf2867 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -49,6 +49,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
 import 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
+import org.apache.flink.types.Row;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -1894,6 +1895,22 @@ public class TypeExtractor {
                        }
                        return new TupleTypeInfo(value.getClass(), infos);
                }
+               else if (value instanceof Row) {
+                       Row row = (Row) value;
+                       int arity = row.getArity();
+                       for (int i = 0; i < arity; i++) {
+                               if (row.getField(i) == null) {
+                                       LOG.warn("Cannot extract type of Row 
field, because of Row field[" + i + "] is null. " +
+                                               "Should define RowTypeInfo 
explicitly.");
+                                       return privateGetForClass((Class<X>) 
value.getClass(), new ArrayList<Type>());
+                               }
+                       }
+                       TypeInformation<?>[] typeArray = new 
TypeInformation<?>[arity];
+                       for (int i = 0; i < arity; i++) {
+                               typeArray[i] = 
TypeExtractor.getForObject(row.getField(i));
+                       }
+                       return (TypeInformation<X>) new RowTypeInfo(typeArray);
+               }
                else {
                        return privateGetForClass((Class<X>) value.getClass(), 
new ArrayList<Type>());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2af939a1/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
index 55cd42d..804cf88 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
@@ -61,6 +61,7 @@ import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
@@ -345,8 +346,25 @@ public class TypeExtractorTest {
 
                
Assert.assertFalse(TypeExtractor.getForClass(PojoWithNonPublicDefaultCtor.class)
 instanceof PojoTypeInfo);
        }
-       
 
+       @Test
+       public void testRow() {
+               Row row = new Row(2);
+               row.setField(0, "string");
+               row.setField(1, 15);
+               TypeInformation<Row> rowInfo = TypeExtractor.getForObject(row);
+               Assert.assertEquals(rowInfo.getClass(), RowTypeInfo.class);
+               Assert.assertEquals(2, rowInfo.getArity());
+               Assert.assertEquals(
+                       new RowTypeInfo(
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.INT_TYPE_INFO),
+                       rowInfo);
+
+               Row nullRow = new Row(2);
+               TypeInformation<Row> genericRowInfo = 
TypeExtractor.getForObject(nullRow);
+               Assert.assertEquals(genericRowInfo, new 
GenericTypeInfo<>(Row.class));
+       }
        
        public static class CustomType {
                public String myField1;

Reply via email to