Repository: flink
Updated Branches:
  refs/heads/master 71d05d744 -> 91f9bfc78


[FLINK-1458] Allow Interfaces and abstract types in TypeExtractor

Kryo already supports them, so it was just a question of the
TypeExtractor allowing them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91f9bfc7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91f9bfc7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91f9bfc7

Branch: refs/heads/master
Commit: 91f9bfc782cc190738bfd3ad822348728a053b46
Parents: 71d05d7
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Feb 2 16:08:18 2015 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Feb 5 12:00:59 2015 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 36 +++++++++-----
 .../java/type/extractor/TypeExtractorTest.java  |  5 +-
 .../runtime/KryoGenericTypeSerializerTest.scala | 52 ++++++++++++++++++++
 .../ScalaSpecialTypesSerializerTest.scala       |  5 +-
 .../runtime/TraversableSerializerTest.scala     |  7 ++-
 5 files changed, 87 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 99292a6..124055c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -893,6 +893,10 @@ public class TypeExtractor {
                while (!(isClassType(curT) && 
typeToClass(curT).equals(stopAtClass))) {
                        typeHierarchy.add(curT);
                        curT = typeToClass(curT).getGenericSuperclass();
+
+                       if (curT == null) {
+                               break;
+                       }
                }
                return curT;
        }
@@ -1090,11 +1094,6 @@ public class TypeExtractor {
                        ParameterizedType parameterizedType, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
                Validate.notNull(clazz);
                
-               // check for abstract classes or interfaces
-               if (!clazz.isPrimitive() && 
(Modifier.isInterface(clazz.getModifiers()) || 
(Modifier.isAbstract(clazz.getModifiers()) && !clazz.isArray()))) {
-                       throw new InvalidTypesException("Interfaces and 
abstract classes are not valid types: " + clazz);
-               }
-
                if (clazz.equals(Object.class)) {
                        return new GenericTypeInfo<OUT>(clazz);
                }
@@ -1153,6 +1152,11 @@ public class TypeExtractor {
 
                alreadySeen.add(clazz);
 
+               if (Modifier.isInterface(clazz.getModifiers())) {
+                       // Interface has no members and is therefore not 
handled as POJO
+                       return new GenericTypeInfo<OUT>(clazz);
+               }
+
                if (clazz.equals(Class.class)) {
                        // special case handling for Class, this should not be 
handled by the POJO logic
                        return new GenericTypeInfo<OUT>(clazz);
@@ -1228,10 +1232,10 @@ public class TypeExtractor {
                                return true;
                        } else {
                                if(!hasGetter) {
-                                       LOG.warn("Class "+clazz+" does not 
contain a getter for field "+f.getName() );
+                                       LOG.debug("Class "+clazz+" does not 
contain a getter for field "+f.getName() );
                                }
                                if(!hasSetter) {
-                                       LOG.warn("Class "+clazz+" does not 
contain a setter for field "+f.getName() );
+                                       LOG.debug("Class "+clazz+" does not 
contain a setter for field "+f.getName() );
                                }
                                return false;
                        }
@@ -1251,11 +1255,16 @@ public class TypeExtractor {
                }
                
                List<Field> fields = getAllDeclaredFields(clazz);
+               if(fields.size() == 0) {
+                       LOG.info("No fields detected for class " + clazz + ". 
Cannot be used as a PojoType. Will be handled as GenericType");
+                       return new GenericTypeInfo<OUT>(clazz);
+               }
+
                List<PojoField> pojoFields = new ArrayList<PojoField>();
                for (Field field : fields) {
                        Type fieldType = field.getGenericType();
                        if(!isValidPojoField(field, clazz, typeHierarchy)) {
-                               LOG.warn("Class "+clazz+" is not a valid POJO 
type");
+                               LOG.info("Class " + clazz + " is not a valid 
POJO type");
                                return null;
                        }
                        try {
@@ -1281,7 +1290,7 @@ public class TypeExtractor {
                List<Method> methods = getAllDeclaredMethods(clazz);
                for (Method method : methods) {
                        if (method.getName().equals("readObject") || 
method.getName().equals("writeObject")) {
-                               LOG.warn("Class "+clazz+" contains custom 
serialization methods we do not call.");
+                               LOG.info("Class "+clazz+" contains custom 
serialization methods we do not call.");
                                return null;
                        }
                }
@@ -1291,8 +1300,13 @@ public class TypeExtractor {
                try {
                        clazz.getDeclaredConstructor();
                } catch (NoSuchMethodException e) {
-                       LOG.warn("Class " + clazz + " must have a default 
constructor to be used as a POJO.");
-                       return null;
+                       if (clazz.isInterface() || 
Modifier.isAbstract(clazz.getModifiers())) {
+                               LOG.info("Class " + clazz + " is abstract or an 
interface, having a concrete " +
+                                               "type can increase 
performance.");
+                       } else {
+                               LOG.info("Class " + clazz + " must have a 
default constructor to be used as a POJO.");
+                               return null;
+                       }
                }
                
                // everything is checked, we return the pojo

http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index f234ed5..748f81c 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple9;
 import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
@@ -1090,7 +1091,7 @@ public class TypeExtractorTest {
                };
                
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO, null, 
true);
-               Assert.assertTrue(ti instanceof MissingTypeInfo);
+               Assert.assertTrue(ti instanceof GenericTypeInfo);
 
                RichMapFunction<String, ?> function2 = new 
RichMapFunction<String, AbstractClass>() {
                        private static final long serialVersionUID = 1L;
@@ -1102,7 +1103,7 @@ public class TypeExtractorTest {
                };
 
                TypeInformation<?> ti2 = 
TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO, 
null, true);
-               Assert.assertTrue(ti2 instanceof MissingTypeInfo);
+               Assert.assertTrue(ti2 instanceof GenericTypeInfo);
        }
 
        @SuppressWarnings({ "rawtypes", "unchecked" })

http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index d61edff..c396f9f 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -32,6 +32,58 @@ import com.esotericsoftware.kryo.io.Input
 class KryoGenericTypeSerializerTest {
 
   @Test
+  def testTraitSerialization(): Unit = {
+    trait SimpleTrait {
+      def contains(x: String): Boolean
+    }
+    class SimpleClass1 extends SimpleTrait {
+      def contains(x: String) = true
+
+      override def equals(other: Any): Boolean = other match {
+        case other: SimpleClass1 => true
+        case _ => false
+      }
+    }
+    class SimpleClass2 extends SimpleTrait {
+      def contains(x: String) = true
+
+      override def equals(other: Any): Boolean = other match {
+        case other: SimpleClass2 => true
+        case _ => false
+      }
+    }
+
+    val testData = Array(new SimpleClass1, new SimpleClass1, new SimpleClass2)
+    runTests(testData)
+  }
+
+  @Test
+  def testAbstractSerialization(): Unit = {
+    abstract class SimpleAbstractBase {
+      def contains(x: String): Boolean
+    }
+    class SimpleClass1 extends SimpleAbstractBase {
+      def contains(x: String) = true
+
+      override def equals(other: Any): Boolean = other match {
+        case other: SimpleClass1 => true
+        case _ => false
+      }
+    }
+    class SimpleClass2 extends SimpleAbstractBase {
+      def contains(x: String) = true
+
+      override def equals(other: Any): Boolean = other match {
+        case other: SimpleClass2 => true
+        case _ => false
+      }
+    }
+
+    val testData = Array(new SimpleClass1, new SimpleClass1, new SimpleClass2)
+    runTests(testData)
+  }
+
+  @Test
   def testThrowableSerialization: Unit = {
     val a = List(new RuntimeException("Hello"), new RuntimeException("there"))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
index b8f79fd..fc51c0c 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
 import org.junit.Assert._
 
 import org.apache.flink.api.common.typeutils.{TypeSerializer, 
SerializerTestInstance}
@@ -116,7 +117,9 @@ class ScalaSpecialTypesSerializerTestInstance[T](
     try {
       val serializer: TypeSerializer[T] = getSerializer
       val instance: T = serializer.createInstance
-      assertNotNull("The created instance must not be null.", instance)
+      if (!serializer.isInstanceOf[KryoSerializer[_]]) {
+        assertNotNull("The created instance must not be null.", instance)
+      }
       val tpe: Class[T] = getTypeClass
       assertNotNull("The test is corrupt: type class is null.", tpe)
       // We cannot check this because Collection Instances are not always of 
the type

http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
index 587bbf3..84ff4a6 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
@@ -55,7 +55,7 @@ class TraversableSerializerTest {
     runTests(testData)
   }
 
-  @Test(expected = classOf[InvalidTypesException])
+  @Test
   def testSortedMap(): Unit = {
     // SortedSet is not supported right now.
     val testData = Array(SortedMap("Hello" -> 1, "World" -> 2), 
SortedMap("Foo" -> 42))
@@ -68,7 +68,7 @@ class TraversableSerializerTest {
     runTests(testData)
   }
 
-  @Test(expected = classOf[InvalidTypesException])
+  @Test
   def testSortedSet(): Unit = {
     // SortedSet is not supported right now.
     val testData = Array(SortedSet(1,2,3), SortedSet(2,3))
@@ -118,10 +118,9 @@ class TraversableSerializerTest {
   }
 
   @Test
-  @Ignore
   def testWithMixedPrimitives(): Unit = {
     // Does not work yet because the GenericTypeInfo used for the elements will
-    // have a typeClass of Object, and therefore not deserializer the elements 
correctly.
+    // have a typeClass of Object, and therefore not deserialize the elements 
correctly.
     // It does work when used in a Job, though. Because the Objects get cast to
     // the correct type in the user function.
     val testData = Array(Seq(1,1L,1d,true,"Hello"), Seq(2,2L,2d,false,"Ciao"))

Reply via email to