http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index 51d4b0e..0aa6097 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.operators.base; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -66,16 +67,19 @@ public class CoGroupOperatorCollectionTest implements Serializable { .build() ); - final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null); + ExecutionConfig executionConfig = new ExecutionConfig(); + final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig); { SumCoGroup udf1 = new SumCoGroup(); SumCoGroup udf2 = new SumCoGroup(); - + + executionConfig.disableObjectReuse(); List<Tuple2<String, Integer>> resultSafe = getCoGroupOperator(udf1) - .executeOnCollections(input1, input2, ctx, true); + .executeOnCollections(input1, input2, ctx, executionConfig); + executionConfig.enableObjectReuse(); List<Tuple2<String, Integer>> resultRegular = getCoGroupOperator(udf2) - .executeOnCollections(input1, input2, ctx, false); + .executeOnCollections(input1, input2, ctx, executionConfig); Assert.assertTrue(udf1.isClosed); Assert.assertTrue(udf2.isClosed); @@ -95,13 +99,15 @@ public class CoGroupOperatorCollectionTest implements Serializable { } { + executionConfig.disableObjectReuse(); List<Tuple2<String, Integer>> resultSafe = getCoGroupOperator(new SumCoGroup()) .executeOnCollections(Collections.<Tuple2<String, Integer>>emptyList(), - Collections.<Tuple2<String, Integer>>emptyList(), ctx, true); - + Collections.<Tuple2<String, Integer>>emptyList(), ctx, executionConfig); + + executionConfig.enableObjectReuse(); List<Tuple2<String, Integer>> resultRegular = getCoGroupOperator(new SumCoGroup()) .executeOnCollections(Collections.<Tuple2<String, Integer>>emptyList(), - Collections.<Tuple2<String, Integer>>emptyList(), ctx, false); + Collections.<Tuple2<String, Integer>>emptyList(), ctx, executionConfig); Assert.assertEquals(0, resultSafe.size()); Assert.assertEquals(0, resultRegular.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index cfca5aa..447c8c5 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.operators.base; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -79,9 +80,12 @@ public class GroupReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String, Integer>("bar", 4))); - - List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, null, true); - List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, false); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, null, executionConfig); + executionConfig.enableObjectReuse(); + List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, executionConfig); Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe); Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular); @@ -155,8 +159,11 @@ public class GroupReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String, Integer>("bar", 4))); - List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); - List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + executionConfig.enableObjectReuse(); + List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index b4ef54f..1b38281 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -20,10 +20,10 @@ package org.apache.flink.api.common.operators.base; import static org.junit.Assert.*; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.BinaryOperatorInformation; -import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -101,8 +101,11 @@ public class JoinOperatorBaseTest implements Serializable { )); try { - List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null), true); - List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null), false); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig), executionConfig); + executionConfig.enableObjectReuse(); + List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig), executionConfig); assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultSafe)); assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultRegular)); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index 90bbe41..4e1eebd 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.operators.base; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -68,8 +69,11 @@ public class ReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String, Integer>("bar", 4))); - List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, null, true); - List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, false); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, null, executionConfig); + executionConfig.enableObjectReuse(); + List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, executionConfig); Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe); Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular); @@ -132,8 +136,11 @@ public class ReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String, Integer>("bar", 4))); - List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); - List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + executionConfig.enableObjectReuse(); + List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe); Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java index 118e707..cd495b5 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -91,7 +92,7 @@ public class CollectionInputFormatTest { TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class); CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection, - info.createSerializer()); + info.createSerializer(new ExecutionConfig())); ByteArrayOutputStream buffer = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(buffer); @@ -168,7 +169,7 @@ public class CollectionInputFormatTest { try { List<String> inputCollection = Arrays.asList(data); - CollectionInputFormat<String> inputFormat = new CollectionInputFormat<String>(inputCollection, BasicTypeInfo.STRING_TYPE_INFO.createSerializer()); + CollectionInputFormat<String> inputFormat = new CollectionInputFormat<String>(inputCollection, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())); // serialize ByteArrayOutputStream baos = new ByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java index 7dd1135..f6e3c2a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.io; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.BinaryInputFormat; import org.apache.flink.api.common.io.BinaryOutputFormat; import org.apache.flink.api.common.io.BlockInfo; @@ -51,7 +52,7 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<In resultType = TypeExtractor.getForObject(getRecord(0)); - serializer = resultType.createSerializer(); + serializer = resultType.createSerializer(new ExecutionConfig()); } @Before http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index b96b3bc..dade55c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -1077,10 +1077,16 @@ public class TypeExtractorTest { public interface Testable {} - public abstract class AbstractClass {} + public static abstract class AbstractClassWithoutMember {} + + public static abstract class AbstractClassWithMember { + public int x; + } @Test - public void testAbstractAndInterfaceTypesException() { + public void testAbstractAndInterfaceTypes() { + + // interface RichMapFunction<String, ?> function = new RichMapFunction<String, Testable>() { private static final long serialVersionUID = 1L; @@ -1089,21 +1095,35 @@ public class TypeExtractorTest { return null; } }; - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO, null, true); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertTrue(ti instanceof GenericTypeInfo); + + // abstract class with out class member + RichMapFunction<String, ?> function2 = new RichMapFunction<String, AbstractClassWithoutMember>() { + private static final long serialVersionUID = 1L; + + @Override + public AbstractClassWithoutMember map(String value) throws Exception { + return null; + } + }; + + ti = TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO); Assert.assertTrue(ti instanceof GenericTypeInfo); - RichMapFunction<String, ?> function2 = new RichMapFunction<String, AbstractClass>() { + // abstract class with class member + RichMapFunction<String, ?> function3 = new RichMapFunction<String, AbstractClassWithMember>() { private static final long serialVersionUID = 1L; @Override - public AbstractClass map(String value) throws Exception { + public AbstractClassWithMember map(String value) throws Exception { return null; } }; - TypeInformation<?> ti2 = TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO, null, true); - Assert.assertTrue(ti2 instanceof GenericTypeInfo); + ti = TypeExtractor.getMapReturnTypes(function3, BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertTrue(ti instanceof PojoTypeInfo); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -1727,7 +1747,7 @@ public class TypeExtractorTest { + "myField1=String,myField2=int" + ">[][][]")); Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<" - + "PojoType<org.apache.flink.api.java.type.extractor.TypeExtractorTest.CustomType, fields = [myField1: String, myField2: Integer]>" + + "PojoType<org.apache.flink.api.java.type.extractor.TypeExtractorTest$CustomType, fields = [myField1: String, myField2: Integer]>" + ">>>", ti.toString()); // generic array http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java index c71625a..33aa449 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java @@ -52,7 +52,8 @@ public class CompositeTypeTest { inNestedTuple3, BasicTypeInfo.INT_TYPE_INFO ); - private final PojoTypeInfo<?> pojoTypeInfo = ((PojoTypeInfo<?>)TypeExtractor.getForClass(MyPojo.class)); + private final PojoTypeInfo<?> pojoTypeInfo = ((PojoTypeInfo<?>) TypeExtractor.getForClass + (MyPojo.class)); private final TupleTypeInfo<?> pojoInTupleTypeInfo = new TupleTypeInfo<Tuple2<Integer, MyPojo>>(BasicTypeInfo.INT_TYPE_INFO, pojoTypeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java index c2f3737..eadf96d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java @@ -28,12 +28,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; import org.apache.flink.types.BooleanValue; import org.apache.flink.types.ByteValue; import org.apache.flink.types.CharValue; @@ -300,7 +294,7 @@ public class TypeInfoParserTest { // pojos ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<basic=String>[][][]"); Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<" - + "PojoType<org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyPojo, fields = [basic: String]>" + + "PojoType<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo, fields = [basic: String]>" + ">>>", ti.toString()); // basic types http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java index a17f499..b317275 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java @@ -21,8 +21,6 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.api.common.typeutils.ComparatorTestBase; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator; -import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer; import org.apache.flink.types.StringValue; public class CopyableValueComparatorTest extends ComparatorTestBase<StringValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java index c190727..d1163d5 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java @@ -18,11 +18,12 @@ package org.apache.flink.api.java.typeutils.runtime; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; public class KryoGenericArraySerializerTest extends AbstractGenericArraySerializerTest { @Override protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) { - return new KryoSerializer<T>(type); + return new KryoSerializer<T>(type, new ExecutionConfig()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java index 37dba4e..01c76d9 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java @@ -18,11 +18,12 @@ package org.apache.flink.api.java.typeutils.runtime; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest { @Override protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new KryoSerializer<T>(type); + return new KryoSerializer<T>(type, new ExecutionConfig()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java index 5953599..8630d95 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime; import static org.junit.Assert.*; +import org.apache.flink.api.common.ExecutionConfig; import org.junit.Test; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -70,7 +71,7 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer @Override protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new KryoSerializer<T>(type); + return new KryoSerializer<T>(type, new ExecutionConfig()); } /** @@ -94,7 +95,7 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer // construct a memory target that is too small for the string TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000); - KryoSerializer<String> serializer = new KryoSerializer<String>(String.class); + KryoSerializer<String> serializer = new KryoSerializer<String>(String.class, new ExecutionConfig()); try { serializer.serialize(str, target); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java index 4c6b39f..b2ed219 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java @@ -27,6 +27,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.MemoryUtils; @@ -84,9 +88,12 @@ public class KryoVersusAvroMinibenchmark { System.out.println("Kryo serializer"); { final TestDataOutputSerializer outView = new TestDataOutputSerializer(100000000); - final KryoSerializer<MyType> serializer = new KryoSerializer<MyType>(MyType.class); - serializer.getKryo().register(Tuple2.class); - + ExecutionConfig conf = new ExecutionConfig(); + conf.registerKryoType(MyType.class); + conf.enableForceKryo(); + TypeInformation<MyType> typeInfo = new GenericTypeInfo<MyType>(MyType.class); + final TypeSerializer<MyType> serializer = typeInfo.createSerializer(conf); + long start = System.nanoTime(); for (int k = 0; k < NUM_ELEMENTS; k++) { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java index 9d7ab61..7020d80 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java @@ -21,7 +21,10 @@ package org.apache.flink.api.java.typeutils.runtime; import java.util.Collection; import java.util.HashSet; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.joda.time.LocalDate; import org.junit.Test; @@ -40,15 +43,16 @@ public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializer b.add(new LocalDate(1L)); b.add(new LocalDate(2L)); - - KryoSerializer.registerSerializer(LocalDate.class, LocalDateSerializer.class); - + runTests(b); } @Override protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new KryoSerializer<T>(type); + ExecutionConfig conf = new ExecutionConfig(); + conf.registerKryoSerializer(LocalDate.class, LocalDateSerializer.class); + TypeInformation<T> typeInfo = new GenericTypeInfo<T>(type); + return typeInfo.createSerializer(conf); } public static final class LocalDateSerializer extends Serializer<LocalDate> implements java.io.Serializable { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java index 22b6c76..1c97816 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils.runtime; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.SerializerTestInstance; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -35,7 +36,7 @@ public class MultidimensionalArraySerializerTest { String[][] array = new String[][]{{null,"b"},{"c","d"},{"e","f"},{"g","h"},null}; TypeInformation<String[][]> ti = TypeExtractor.getForClass(String[][].class); - SerializerTestInstance<String[][]> testInstance = new SerializerTestInstance<String[][]>(ti.createSerializer(), String[][].class, -1, array); + SerializerTestInstance<String[][]> testInstance = new SerializerTestInstance<String[][]>(ti.createSerializer(new ExecutionConfig()), String[][].class, -1, array); testInstance.testAll(); } @@ -44,7 +45,7 @@ public class MultidimensionalArraySerializerTest { int[][] array = new int[][]{{12,1},{48,42},{23,80},{484,849},{987,4}}; TypeInformation<int[][]> ti = TypeExtractor.getForClass(int[][].class); - SerializerTestInstance<int[][]> testInstance = new SerializerTestInstance<int[][]>(ti.createSerializer(), int[][].class, -1, array); + SerializerTestInstance<int[][]> testInstance = new SerializerTestInstance<int[][]>(ti.createSerializer(new ExecutionConfig()), int[][].class, -1, array); testInstance.testAll(); } @@ -73,13 +74,13 @@ public class MultidimensionalArraySerializerTest { Integer[][] array = new Integer[][]{{0,1}, null, {null, 42}}; TypeInformation<Integer[][]> ti = TypeExtractor.getForClass(Integer[][].class); - SerializerTestInstance<Integer[][]> testInstance = new SerializerTestInstance<Integer[][]>(ti.createSerializer(), Integer[][].class, -1, array); + SerializerTestInstance<Integer[][]> testInstance = new SerializerTestInstance<Integer[][]>(ti.createSerializer(new ExecutionConfig()), Integer[][].class, -1, array); testInstance.testAll(); MyPojo[][] array2 = new MyPojo[][]{{new MyPojo(null, 42), new MyPojo("test2", -1)}, {null, null}, null}; TypeInformation<MyPojo[][]> ti2 = TypeExtractor.getForClass(MyPojo[][].class); - SerializerTestInstance<MyPojo[][]> testInstance2 = new SerializerTestInstance<MyPojo[][]>(ti2.createSerializer(), MyPojo[][].class, -1, array2); + SerializerTestInstance<MyPojo[][]> testInstance2 = new SerializerTestInstance<MyPojo[][]>(ti2.createSerializer(new ExecutionConfig()), MyPojo[][].class, -1, array2); testInstance2.testAll(); } @@ -112,7 +113,7 @@ public class MultidimensionalArraySerializerTest { }; TypeInformation ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.runtime.MultidimensionalArraySerializerTest$MyGenericPojo<field=String[][]>[][]"); - SerializerTestInstance testInstance = new SerializerTestInstance(ti.createSerializer(), MyGenericPojo[][].class, -1, (Object) array); + SerializerTestInstance testInstance = new SerializerTestInstance(ti.createSerializer(new ExecutionConfig()), MyGenericPojo[][].class, -1, (Object) array); testInstance.testAll(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java index e53f48a..1baf443 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime; import java.util.Arrays; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.ComparatorTestBase; import org.apache.flink.api.common.typeutils.CompositeType; @@ -47,12 +48,12 @@ public class PojoComparatorTest extends ComparatorTestBase<PojoContainingTuple> ExpressionKeys<PojoContainingTuple> keys = new ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType); boolean[] orders = new boolean[keys.getNumberOfKeyFields()]; Arrays.fill(orders, ascending); - return cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0); + return cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0, new ExecutionConfig()); } @Override protected TypeSerializer<PojoContainingTuple> createSerializer() { - return type.createSerializer(); + return type.createSerializer(new ExecutionConfig()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java index a176178..d405412 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils.runtime; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -27,6 +28,6 @@ public class PojoGenericTypeSerializerTest extends AbstractGenericTypeSerializer @Override protected <T> TypeSerializer<T> createSerializer(Class<T> type) { TypeInformation<T> typeInfo = TypeExtractor.getForClass(type); - return typeInfo.createSerializer(); + return typeInfo.createSerializer(new ExecutionConfig()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java index 006625e..1fa7163 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -40,14 +41,14 @@ import org.junit.Test; import com.google.common.base.Objects; /** - * A test for the {@link org.apache.flink.api.java.typeutils.runtime.PojoSerializer}. + * A test for the {@link PojoSerializer}. */ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.TestUserClass> { private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); @Override protected TypeSerializer<TestUserClass> createSerializer() { - TypeSerializer<TestUserClass> serializer = type.createSerializer(); + TypeSerializer<TestUserClass> serializer = type.createSerializer(new ExecutionConfig()); assert(serializer instanceof PojoSerializer); return serializer; } @@ -199,14 +200,14 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te pType.getFlatFields("nestedClass.dumm2", 0, result); int[] fields = new int[1]; // see below fields[0] = result.get(0).getPosition(); - TypeComparator<TestUserClass> pojoComp = pType.createComparator( fields, new boolean[]{true}, 0); + TypeComparator<TestUserClass> pojoComp = pType.createComparator( fields, new boolean[]{true}, 0, new ExecutionConfig()); TestUserClass pojoTestRecord = new TestUserClass(0, "abc", 3d, new int[] {1,2,3}, new NestedTestUserClass(1, "haha", 4d, new int[] {5,4,3})); int pHash = pojoComp.hash(pojoTestRecord); Tuple1<String> tupleTest = new Tuple1<String>("haha"); TupleTypeInfo<Tuple1<String>> tType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor.getForObject(tupleTest); - TypeComparator<Tuple1<String>> tupleComp = tType.createComparator(new int[] {0}, new boolean[] {true}, 0); + TypeComparator<Tuple1<String>> tupleComp = tType.createComparator(new int[] {0}, new boolean[] {true}, 0, new ExecutionConfig()); int tHash = tupleComp.hash(tupleTest); @@ -223,12 +224,12 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te e.printStackTrace(); Assert.fail("Keys must be compatible: "+e.getMessage()); } - TypeComparator<TestUserClass> multiPojoComp = pType.createComparator( expressKey.computeLogicalKeyPositions(), new boolean[]{true, true, true}, 0); + TypeComparator<TestUserClass> multiPojoComp = pType.createComparator( expressKey.computeLogicalKeyPositions(), new boolean[]{true, true, true}, 0, new ExecutionConfig()); int multiPojoHash = multiPojoComp.hash(pojoTestRecord); // pojo order is: dumm2 (str), dumm1 (int), dumm3 (double). - TypeComparator<Tuple3<Integer, String, Double>> multiTupleComp = multiTupleType.createComparator(fieldKey.computeLogicalKeyPositions(), new boolean[] {true, true,true}, 0); + TypeComparator<Tuple3<Integer, String, Double>> multiTupleComp = multiTupleType.createComparator(fieldKey.computeLogicalKeyPositions(), new boolean[] {true, true,true}, 0, new ExecutionConfig()); int multiTupleHash = multiTupleComp.hash(multiTupleTest); Assert.assertTrue("The hashing for tuples and pojos must be the same, so that they are mixable. Also for those with multiple key fields", multiPojoHash == multiTupleHash); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java new file mode 100644 index 0000000..3a03683 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.operators.Keys.ExpressionKeys; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.junit.Assert; + +import java.util.Arrays; + + +public class PojoSubclassComparatorTest extends ComparatorTestBase<PojoContainingTuple> { + TypeInformation<PojoContainingTuple> type = TypeExtractor.getForClass(PojoContainingTuple.class); + + PojoContainingTuple[] data = new PojoContainingTuple[]{ + new Subclass(1, 1L, 1L, 17L), + new Subclass(2, 2L, 2L, 42L), + new Subclass(8519, 85190L, 85190L, 117L), + new Subclass(8520, 85191L, 85191L, 93L), + }; + + @Override + protected TypeComparator<PojoContainingTuple> createComparator(boolean ascending) { + Assert.assertTrue(type instanceof CompositeType); + CompositeType<PojoContainingTuple> cType = (CompositeType<PojoContainingTuple>) type; + ExpressionKeys<PojoContainingTuple> keys = new ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType); + boolean[] orders = new boolean[keys.getNumberOfKeyFields()]; + Arrays.fill(orders, ascending); + return cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0, new ExecutionConfig()); + } + + @Override + protected TypeSerializer<PojoContainingTuple> createSerializer() { + return type.createSerializer(new ExecutionConfig()); + } + + @Override + protected PojoContainingTuple[] getSortedTestData() { + return data; + } + + public static class Subclass extends PojoContainingTuple { + + public long additionalField; + + public Subclass() { + } + + public Subclass(int i, long l1, long l2, long additionalField) { + super(i, l1, l2); + this.additionalField = additionalField; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java new file mode 100644 index 0000000..8c61a19 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import com.google.common.base.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.junit.Test; + +import java.util.Random; + +/** + * A test for the {@link PojoSerializer}. + */ +public class PojoSubclassSerializerTest extends SerializerTestBase<PojoSubclassSerializerTest.TestUserClassBase> { + private TypeInformation<TestUserClassBase> type = TypeExtractor.getForClass(TestUserClassBase.class); + + @Override + protected TypeSerializer<TestUserClassBase> createSerializer() { + // only register one of the three child classes, the third child class is NO POJO + ExecutionConfig conf = new ExecutionConfig(); + conf.registerPojoType(TestUserClass1.class); + TypeSerializer<TestUserClassBase> serializer = type.createSerializer(conf); + assert(serializer instanceof PojoSerializer); + return serializer; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<TestUserClassBase> getTypeClass() { + return TestUserClassBase.class; + } + + @Override + protected TestUserClassBase[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new TestUserClassBase[]{ + new TestUserClass1(rnd.nextInt(), "foo", rnd.nextLong()), + new TestUserClass2(rnd.nextInt(), "bar", rnd.nextFloat()), + new TestUserClass3(rnd.nextInt(), "bar", rnd.nextFloat()) + }; + + } + + @Override + @Test + public void testInstantiate() { + // don't do anything, since the PojoSerializer with subclass will return null + } + + // User code class for testing the serializer + public static abstract class TestUserClassBase { + public int dumm1; + public String dumm2; + + + public TestUserClassBase() { + } + + public TestUserClassBase(int dumm1, String dumm2) { + this.dumm1 = dumm1; + this.dumm2 = dumm2; + } + + @Override + public int hashCode() { + return Objects.hashCode(dumm1, dumm2); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClassBase)) { + return false; + } + TestUserClassBase otherTUC = (TestUserClassBase) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + return true; + } + } + + public static class TestUserClass1 extends TestUserClassBase { + public long dumm3; + + public TestUserClass1() { + } + + public TestUserClass1(int dumm1, String dumm2, long dumm3) { + super(dumm1, dumm2); + this.dumm3 = dumm3; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClass1)) { + return false; + } + TestUserClass1 otherTUC = (TestUserClass1) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm3 != otherTUC.dumm3) { + return false; + } + return true; + } + } + + public static class TestUserClass2 extends TestUserClassBase { + public float dumm4; + + public TestUserClass2() { + } + + public TestUserClass2(int dumm1, String dumm2, float dumm4) { + super(dumm1, dumm2); + this.dumm4 = dumm4; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClass2)) { + return false; + } + TestUserClass2 otherTUC = (TestUserClass2) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm4 != otherTUC.dumm4) { + return false; + } + return true; + } + } + + public static class TestUserClass3 extends TestUserClassBase { + public float dumm4; + + public TestUserClass3(int dumm1, String dumm2, float dumm4) { + super(dumm1, dumm2); + this.dumm4 = dumm4; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClass3)) { + return false; + } + TestUserClass3 otherTUC = (TestUserClass3) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm4 != otherTUC.dumm4) { + return false; + } + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java new file mode 100644 index 0000000..efb1b9b --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import com.google.common.base.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.junit.Test; + +import java.util.Random; + +/** + * Testing the serialization of classes which are subclasses of a class that implements an interface. + */ +public class SubclassFromInterfaceSerializerTest extends SerializerTestBase<SubclassFromInterfaceSerializerTest.TestUserInterface> { + private TypeInformation<TestUserInterface> type = TypeExtractor.getForClass(TestUserInterface.class); + + @Override + protected TypeSerializer<TestUserInterface> createSerializer() { + // only register one of the two child classes + ExecutionConfig conf = new ExecutionConfig(); + conf.registerPojoType(TestUserClass2.class); + TypeSerializer<TestUserInterface> serializer = type.createSerializer(conf); + assert(serializer instanceof KryoSerializer); + return serializer; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<TestUserInterface> getTypeClass() { + return TestUserInterface.class; + } + + @Override + protected TestUserInterface[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new TestUserInterface[]{ + new TestUserClass1(rnd.nextInt(), "foo", rnd.nextLong()), + new TestUserClass2(rnd.nextInt(), "bar", rnd.nextFloat()) + }; + + } + + @Override + @Test + public void testInstantiate() { + // don't do anything, since the PojoSerializer with subclass will return null + } + + public interface TestUserInterface {} + + // User code class for testing the serializer + public static class TestUserClassBase implements TestUserInterface { + public int dumm1; + public String dumm2; + + + public TestUserClassBase() { + } + + public TestUserClassBase(int dumm1, String dumm2) { + this.dumm1 = dumm1; + this.dumm2 = dumm2; + } + + @Override + public int hashCode() { + return Objects.hashCode(dumm1, dumm2); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClassBase)) { + return false; + } + TestUserClassBase otherTUC = (TestUserClassBase) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + return true; + } + } + + public static class TestUserClass1 extends TestUserClassBase { + public long dumm3; + + public TestUserClass1() { + } + + public TestUserClass1(int dumm1, String dumm2, long dumm3) { + super(dumm1, dumm2); + this.dumm3 = dumm3; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClass1)) { + return false; + } + TestUserClass1 otherTUC = (TestUserClass1) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm3 != otherTUC.dumm3) { + return false; + } + return true; + } + } + + public static class TestUserClass2 extends TestUserClassBase { + public float dumm4; + + public TestUserClass2() { + } + + public TestUserClass2(int dumm1, String dumm2, float dumm4) { + super(dumm1, dumm2); + this.dumm4 = dumm4; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClass2)) { + return false; + } + TestUserClass2 otherTUC = (TestUserClass2) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm4 != otherTUC.dumm4) { + return false; + } + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java index 9f1c7b8..cfc4914 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java @@ -26,8 +26,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongComparator; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java index 8d13ab7..e5a0e6c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java @@ -27,8 +27,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongComparator; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java index 82b8b4e..a1e6c40 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java @@ -27,8 +27,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongComparator; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java index 25de450..b5c0c1f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java @@ -25,8 +25,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongComparator; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java index 2a5fc29..793a2f4 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java @@ -26,8 +26,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongComparator; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java index b2218ad..8cdee9b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java @@ -25,8 +25,6 @@ import org.apache.flink.api.common.typeutils.base.IntComparator; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java index fe152a3..06c292f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java @@ -26,8 +26,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringComparator; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java index b92d825..d823a29 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java @@ -27,8 +27,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringComparator; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java index a701a82..96f8306 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java @@ -22,6 +22,7 @@ package org.apache.flink.api.java.typeutils.runtime; import java.util.ArrayList; import java.util.Random; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; @@ -209,7 +210,7 @@ public class TupleSerializerTest { private <T extends Tuple> void runTests(T... instances) { try { TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) TypeExtractor.getForObject(instances[0]); - TypeSerializer<T> serializer = tupleTypeInfo.createSerializer(); + TypeSerializer<T> serializer = tupleTypeInfo.createSerializer(new ExecutionConfig()); Class<T> tupleClass = tupleTypeInfo.getTypeClass(); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java index e710187..cf9874d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java @@ -21,8 +21,6 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.api.common.typeutils.ComparatorTestBase; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.ValueComparator; -import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; import org.apache.flink.types.StringValue; public class ValueComparatorTest extends ComparatorTestBase<StringValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java index ce1147a..f5a90b7 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java @@ -21,8 +21,6 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.api.common.typeutils.ComparatorTestBase; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.WritableComparator; -import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java index 01349bf..557c0e4 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java @@ -18,10 +18,10 @@ package org.apache.flink.api.java.typeutils.runtime; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.SerializerTestInstance; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.WritableTypeInfo; -import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; import org.junit.Test; public class WritableSerializerTest { @@ -41,7 +41,7 @@ public class WritableSerializerTest { }; WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]); - WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(); + WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig()); SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java index 49a3fe5..6edd2f7 100644 --- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java +++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java @@ -29,7 +29,6 @@ import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java index 4f10683..d213069 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.iterative.task; +import org.apache.flink.api.common.ExecutionConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.aggregators.Aggregator; @@ -165,7 +166,7 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends public DistributedRuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); return new IterativeRuntimeUdfContext(taskName, env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), getUserCodeClassLoader()); + env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig()); } // -------------------------------------------------------------------------------------------- @@ -355,8 +356,8 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext { - public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader); + public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index ff1b272..42bbe3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -18,14 +18,21 @@ package org.apache.flink.runtime.jobgraph.tasks; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.util.InstantiationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Abstract base class for every task class in Flink. */ public abstract class AbstractInvokable { + private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokable.class); + + /** * The environment assigned to this invokable. */ @@ -111,6 +118,26 @@ public abstract class AbstractInvokable { } /** + * Returns the global ExecutionConfig. + */ + public ExecutionConfig getExecutionConfig() { + try { + ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig( + getJobConfiguration(), + ExecutionConfig.CONFIG_KEY, + this.getClass().getClassLoader()); + if (c != null) { + return c; + } else { + return new ExecutionConfig(); + } + } catch (Exception e) { + LOG.warn("Could not load ExecutionConfig from Environment, returning default ExecutionConfig: {}", e); + return new ExecutionConfig(); + } + } + + /** * This method is called when a task is canceled either as a result of a user abort or an execution failure. It can * be overwritten to respond to shut down the user code properly. * http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 330f438..01ab533 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -74,24 +74,6 @@ public class DataSourceTask<OT> extends AbstractInvokable { // cancel flag private volatile boolean taskCanceled = false; - private ExecutionConfig getExecutionConfig() { - ExecutionConfig executionConfig = new ExecutionConfig(); - try { - ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig( - getJobConfiguration(), - ExecutionConfig.CONFIG_KEY, - this.getClass().getClassLoader()); - if (c != null) { - executionConfig = c; - } - } catch (IOException e) { - throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e); - } - return executionConfig; - } - @Override public void registerInputOutput() { initInputFormat(); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java index f6d2f4a..ffe09cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java @@ -45,8 +45,6 @@ public interface PactTaskContext<S, OT> { TaskConfig getTaskConfig(); - ExecutionConfig getExecutionConfig(); - ClassLoader getUserCodeClassLoader(); MemoryManager getMemoryManager(); @@ -60,7 +58,9 @@ public interface PactTaskContext<S, OT> { <X> TypeComparator<X> getDriverComparator(int index); S getStub(); - + + ExecutionConfig getExecutionConfig(); + Collector<OT> getOutputCollector(); AbstractInvokable getOwningNepheleTask(); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index 6b6918d..9118400 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -1068,7 +1068,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i public DistributedRuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); return new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), env.getCopyTask()); + env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), env.getCopyTask()); } // -------------------------------------------------------------------------------------------- @@ -1081,23 +1081,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i } @Override - public ExecutionConfig getExecutionConfig() { - try { - ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig( - getOwningNepheleTask().getJobConfiguration(), - ExecutionConfig.CONFIG_KEY, - this.getClass().getClassLoader()); - if (c != null) { - return c; - } else { - return new ExecutionConfig(); - } - } catch (Exception e) { - throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e); - } - } - - @Override public MemoryManager getMemoryManager() { return getEnvironment().getMemoryManager(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index 838286e..c84d888 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -62,7 +62,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> { } else { Environment env = parent.getEnvironment(); this.udfContext = new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), userCodeClassLoader, env.getCopyTask()); + env.getIndexInSubtaskGroup(), userCodeClassLoader, parent.getExecutionConfig(), env.getCopyTask()); } this.executionConfig = executionConfig; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java index 0080d63..ce63021 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -102,6 +103,8 @@ public class LargeRecordHandler<T> { private volatile boolean closed; + private final ExecutionConfig executionConfig; + // -------------------------------------------------------------------------------------------- public LargeRecordHandler(TypeSerializer<T> serializer, TypeComparator<T> comparator, @@ -115,7 +118,9 @@ public class LargeRecordHandler<T> { this.memory = checkNotNull(memory); this.memoryOwner = checkNotNull(memoryOwner); this.maxFilehandles = maxFilehandles; - + + this.executionConfig = memoryOwner.getExecutionConfig(); + checkArgument(maxFilehandles >= 2); } @@ -374,13 +379,13 @@ public class LargeRecordHandler<T> { // -------------------------------------------------------------------------------------------- - private static TypeSerializer<Object> createSerializer(Object key, int pos) { + private TypeSerializer<Object> createSerializer(Object key, int pos) { if (key == null) { throw new NullKeyFieldException(pos); } try { TypeInformation<Object> info = TypeExtractor.getForObject(key); - return info.createSerializer(); + return info.createSerializer(executionConfig); } catch (Throwable t) { throw new RuntimeException("Could not create key serializer for type " + key); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java index aa7aec6..e841066 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.FutureTask; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext; @@ -40,12 +41,12 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext { private final HashMap<String, BroadcastVariableMaterialization<?, ?>> broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>(); - public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader); + public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig); } - public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, Map<String, FutureTask<Path>> cpTasks) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, cpTasks); + public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, FutureTask<Path>> cpTasks) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks); }