Repository: flink Updated Branches: refs/heads/master 71d05d744 -> 91f9bfc78
[FLINK-1458] Allow Interfaces and abstract types in TypeExtractor Kryo already supports them, so it was just a question of the TypeExtractor allowing them. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91f9bfc7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91f9bfc7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91f9bfc7 Branch: refs/heads/master Commit: 91f9bfc782cc190738bfd3ad822348728a053b46 Parents: 71d05d7 Author: Aljoscha Krettek <[email protected]> Authored: Mon Feb 2 16:08:18 2015 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Feb 5 12:00:59 2015 +0100 ---------------------------------------------------------------------- .../flink/api/java/typeutils/TypeExtractor.java | 36 +++++++++----- .../java/type/extractor/TypeExtractorTest.java | 5 +- .../runtime/KryoGenericTypeSerializerTest.scala | 52 ++++++++++++++++++++ .../ScalaSpecialTypesSerializerTest.scala | 5 +- .../runtime/TraversableSerializerTest.scala | 7 ++- 5 files changed, 87 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 99292a6..124055c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -893,6 +893,10 @@ public class TypeExtractor { while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) { typeHierarchy.add(curT); curT = typeToClass(curT).getGenericSuperclass(); + + if (curT == null) { + break; + } } return curT; } @@ -1090,11 +1094,6 @@ public class TypeExtractor { ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { Validate.notNull(clazz); - // check for abstract classes or interfaces - if (!clazz.isPrimitive() && (Modifier.isInterface(clazz.getModifiers()) || (Modifier.isAbstract(clazz.getModifiers()) && !clazz.isArray()))) { - throw new InvalidTypesException("Interfaces and abstract classes are not valid types: " + clazz); - } - if (clazz.equals(Object.class)) { return new GenericTypeInfo<OUT>(clazz); } @@ -1153,6 +1152,11 @@ public class TypeExtractor { alreadySeen.add(clazz); + if (Modifier.isInterface(clazz.getModifiers())) { + // Interface has no members and is therefore not handled as POJO + return new GenericTypeInfo<OUT>(clazz); + } + if (clazz.equals(Class.class)) { // special case handling for Class, this should not be handled by the POJO logic return new GenericTypeInfo<OUT>(clazz); @@ -1228,10 +1232,10 @@ public class TypeExtractor { return true; } else { if(!hasGetter) { - LOG.warn("Class "+clazz+" does not contain a getter for field "+f.getName() ); + LOG.debug("Class "+clazz+" does not contain a getter for field "+f.getName() ); } if(!hasSetter) { - LOG.warn("Class "+clazz+" does not contain a setter for field "+f.getName() ); + LOG.debug("Class "+clazz+" does not contain a setter for field "+f.getName() ); } return false; } @@ -1251,11 +1255,16 @@ public class TypeExtractor { } List<Field> fields = getAllDeclaredFields(clazz); + if(fields.size() == 0) { + LOG.info("No fields detected for class " + clazz + ". Cannot be used as a PojoType. Will be handled as GenericType"); + return new GenericTypeInfo<OUT>(clazz); + } + List<PojoField> pojoFields = new ArrayList<PojoField>(); for (Field field : fields) { Type fieldType = field.getGenericType(); if(!isValidPojoField(field, clazz, typeHierarchy)) { - LOG.warn("Class "+clazz+" is not a valid POJO type"); + LOG.info("Class " + clazz + " is not a valid POJO type"); return null; } try { @@ -1281,7 +1290,7 @@ public class TypeExtractor { List<Method> methods = getAllDeclaredMethods(clazz); for (Method method : methods) { if (method.getName().equals("readObject") || method.getName().equals("writeObject")) { - LOG.warn("Class "+clazz+" contains custom serialization methods we do not call."); + LOG.info("Class "+clazz+" contains custom serialization methods we do not call."); return null; } } @@ -1291,8 +1300,13 @@ public class TypeExtractor { try { clazz.getDeclaredConstructor(); } catch (NoSuchMethodException e) { - LOG.warn("Class " + clazz + " must have a default constructor to be used as a POJO."); - return null; + if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) { + LOG.info("Class " + clazz + " is abstract or an interface, having a concrete " + + "type can increase performance."); + } else { + LOG.info("Class " + clazz + " must have a default constructor to be used as a POJO."); + return null; + } } // everything is checked, we return the pojo http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index f234ed5..748f81c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -45,6 +45,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple9; import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo; @@ -1090,7 +1091,7 @@ public class TypeExtractorTest { }; TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO, null, true); - Assert.assertTrue(ti instanceof MissingTypeInfo); + Assert.assertTrue(ti instanceof GenericTypeInfo); RichMapFunction<String, ?> function2 = new RichMapFunction<String, AbstractClass>() { private static final long serialVersionUID = 1L; @@ -1102,7 +1103,7 @@ public class TypeExtractorTest { }; TypeInformation<?> ti2 = TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO, null, true); - Assert.assertTrue(ti2 instanceof MissingTypeInfo); + Assert.assertTrue(ti2 instanceof GenericTypeInfo); } @SuppressWarnings({ "rawtypes", "unchecked" }) http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala index d61edff..c396f9f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala @@ -32,6 +32,58 @@ import com.esotericsoftware.kryo.io.Input class KryoGenericTypeSerializerTest { @Test + def testTraitSerialization(): Unit = { + trait SimpleTrait { + def contains(x: String): Boolean + } + class SimpleClass1 extends SimpleTrait { + def contains(x: String) = true + + override def equals(other: Any): Boolean = other match { + case other: SimpleClass1 => true + case _ => false + } + } + class SimpleClass2 extends SimpleTrait { + def contains(x: String) = true + + override def equals(other: Any): Boolean = other match { + case other: SimpleClass2 => true + case _ => false + } + } + + val testData = Array(new SimpleClass1, new SimpleClass1, new SimpleClass2) + runTests(testData) + } + + @Test + def testAbstractSerialization(): Unit = { + abstract class SimpleAbstractBase { + def contains(x: String): Boolean + } + class SimpleClass1 extends SimpleAbstractBase { + def contains(x: String) = true + + override def equals(other: Any): Boolean = other match { + case other: SimpleClass1 => true + case _ => false + } + } + class SimpleClass2 extends SimpleAbstractBase { + def contains(x: String) = true + + override def equals(other: Any): Boolean = other match { + case other: SimpleClass2 => true + case _ => false + } + } + + val testData = Array(new SimpleClass1, new SimpleClass1, new SimpleClass2) + runTests(testData) + } + + @Test def testThrowableSerialization: Unit = { val a = List(new RuntimeException("Hello"), new RuntimeException("there")) http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala index b8f79fd..fc51c0c 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.java.typeutils.runtime.KryoSerializer import org.junit.Assert._ import org.apache.flink.api.common.typeutils.{TypeSerializer, SerializerTestInstance} @@ -116,7 +117,9 @@ class ScalaSpecialTypesSerializerTestInstance[T]( try { val serializer: TypeSerializer[T] = getSerializer val instance: T = serializer.createInstance - assertNotNull("The created instance must not be null.", instance) + if (!serializer.isInstanceOf[KryoSerializer[_]]) { + assertNotNull("The created instance must not be null.", instance) + } val tpe: Class[T] = getTypeClass assertNotNull("The test is corrupt: type class is null.", tpe) // We cannot check this because Collection Instances are not always of the type http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala index 587bbf3..84ff4a6 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala @@ -55,7 +55,7 @@ class TraversableSerializerTest { runTests(testData) } - @Test(expected = classOf[InvalidTypesException]) + @Test def testSortedMap(): Unit = { // SortedSet is not supported right now. val testData = Array(SortedMap("Hello" -> 1, "World" -> 2), SortedMap("Foo" -> 42)) @@ -68,7 +68,7 @@ class TraversableSerializerTest { runTests(testData) } - @Test(expected = classOf[InvalidTypesException]) + @Test def testSortedSet(): Unit = { // SortedSet is not supported right now. val testData = Array(SortedSet(1,2,3), SortedSet(2,3)) @@ -118,10 +118,9 @@ class TraversableSerializerTest { } @Test - @Ignore def testWithMixedPrimitives(): Unit = { // Does not work yet because the GenericTypeInfo used for the elements will - // have a typeClass of Object, and therefore not deserializer the elements correctly. + // have a typeClass of Object, and therefore not deserialize the elements correctly. // It does work when used in a Job, though. Because the Objects get cast to // the correct type in the user function. val testData = Array(Seq(1,1L,1d,true,"Hello"), Seq(2,2L,2d,false,"Ciao"))
