http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java index 5163801..56e2680 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.typeinfo; import java.util.HashMap; import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArraySerializer; @@ -86,7 +87,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> { } @Override - public TypeSerializer<T> createSerializer() { + public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { return this.serializer; }
http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java index 0f86486..0ed5afd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java @@ -18,10 +18,13 @@ package org.apache.flink.api.common.typeinfo; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; -public abstract class TypeInformation<T> { - +import java.io.Serializable; + +public abstract class TypeInformation<T> implements Serializable { + public abstract boolean isBasicType(); public abstract boolean isTupleType(); @@ -32,7 +35,7 @@ public abstract class TypeInformation<T> { public abstract boolean isKeyType(); - public abstract TypeSerializer<T> createSerializer(); + public abstract TypeSerializer<T> createSerializer(ExecutionConfig config); /** * @return The number of fields in this type, including its sub-fields (for compsite types) http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java index c3ea0e4..f3e4cfa 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.typeutils; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -91,7 +92,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> { /** * Get the actual comparator we've initialized. */ - protected abstract TypeComparator<T> getNewComparator(); + protected abstract TypeComparator<T> getNewComparator(ExecutionConfig config); /** @@ -99,7 +100,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> { * to create the actual comparators * @return The comparator */ - public TypeComparator<T> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset) { + public TypeComparator<T> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) { initializeNewComparator(logicalKeyFields.length); for(int logicalKeyFieldIndex = 0; logicalKeyFieldIndex < logicalKeyFields.length; logicalKeyFieldIndex++) { @@ -110,13 +111,13 @@ public abstract class CompositeType<T> extends TypeInformation<T> { if(localFieldType instanceof AtomicType && logicalField == logicalKeyField) { // we found an atomic key --> create comparator - addCompareField(localFieldId, ((AtomicType<?>) localFieldType).createComparator(orders[logicalKeyFieldIndex]) ); + addCompareField(localFieldId, ((AtomicType<?>) localFieldType).createComparator(orders[logicalKeyFieldIndex], config) ); } else if(localFieldType instanceof CompositeType && // must be a composite type ( logicalField <= logicalKeyField //check if keyField can be at or behind the current logicalField && logicalKeyField <= logicalField + (localFieldType.getTotalFields() - 1) ) // check if logical field + lookahead could contain our key ) { // we found a compositeType that is containing the logicalKeyField we are looking for --> create comparator - addCompareField(localFieldId, ((CompositeType<?>) localFieldType).createComparator(new int[] {logicalKeyField}, new boolean[] {orders[logicalKeyFieldIndex]}, logicalField)); + addCompareField(localFieldId, ((CompositeType<?>) localFieldType).createComparator(new int[] {logicalKeyField}, new boolean[] {orders[logicalKeyFieldIndex]}, logicalField, config)); } // maintain logicalField @@ -127,7 +128,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> { logicalField++; } } - return getNewComparator(); + return getNewComparator(config); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java index 7d701dc..c77a1b6 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.junit.Test; @@ -33,7 +34,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableNotFound() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig()); try { ctx.getBroadcastVariable("some name"); @@ -63,7 +64,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableSimple() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig()); ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4)); ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0)); @@ -97,7 +98,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -122,7 +123,7 @@ public class RuntimeUDFContextTest { @Test public void testResetBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -145,7 +146,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableWithInitializerAndMismatch() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index 9a0a2b5..d231455 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.operators.base; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -63,9 +64,15 @@ public class FlatMapOperatorCollectionTest implements Serializable { } private void testExecuteOnCollection(FlatMapFunction<String, String> udf, List<String> input, boolean mutableSafe) throws Exception { + ExecutionConfig executionConfig = new ExecutionConfig(); + if (mutableSafe) { + executionConfig.disableObjectReuse(); + } else { + executionConfig.enableObjectReuse(); + } // run on collections final List<String> result = getTestFlatMapOperator(udf) - .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null), mutableSafe); + .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig), executionConfig); Assert.assertEquals(input.size(), result.size()); Assert.assertEquals(input, result); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index feb2223..54975b4 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base; import static org.junit.Assert.*; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.RichFlatJoinFunction; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; @@ -60,8 +61,11 @@ public class JoinOperatorBaseTest implements Serializable { List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 3, 6 ,6)); try { - List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, null, true); - List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, null, false); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, null, executionConfig); + executionConfig.enableObjectReuse(); + List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, null, executionConfig); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); @@ -110,8 +114,11 @@ public class JoinOperatorBaseTest implements Serializable { try { - List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null), true); - List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null), false); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + executionConfig.enableObjectReuse(); + List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index fd23d40..8e07f07 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -52,8 +53,12 @@ public class MapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), "TestMapper"); List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6")); - List<Integer> resultMutableSafe = op.executeOnCollections(input, null, true); - List<Integer> resultRegular = op.executeOnCollections(input, null, false); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<Integer> resultMutableSafe = op.executeOnCollections(input, null, executionConfig); + executionConfig.enableObjectReuse(); + List<Integer> resultRegular = op.executeOnCollections(input, null, executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); @@ -97,8 +102,11 @@ public class MapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName); List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6")); - List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); - List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + executionConfig.enableObjectReuse(); + List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index 50c6b98..61ba359 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.util.Collector; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.RichMapPartitionFunction; @@ -74,9 +75,12 @@ public class PartitionMapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName); List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6")); - - List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); - List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + executionConfig.enableObjectReuse(); + List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index 1531ae6..59bea0c 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -358,7 +358,7 @@ public abstract class SerializerTestBase<T> { try { ser2 = SerializationUtils.clone(ser1); } catch (SerializationException e) { - fail("The serializer is not serializable."); + fail("The serializer is not serializable: " + e); return; } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java index 2d57490..c61d624 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java @@ -29,7 +29,7 @@ public class CollectionEnvironment extends ExecutionEnvironment { Plan p = createProgramPlan(jobName); // We need to reverse here. Object-Reuse enabled, means safe mode is disabled. - CollectionExecutor exec = new CollectionExecutor(!getConfig().isObjectReuseEnabled()); + CollectionExecutor exec = new CollectionExecutor(getConfig()); return exec.execute(p); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index e5bb1fd..f2091e2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -69,8 +69,10 @@ import org.apache.flink.api.java.operators.UnionOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.MissingTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.FileSystem.WriteMode; @@ -92,7 +94,7 @@ import com.google.common.base.Preconditions; */ public abstract class DataSet<T> { - private final ExecutionEnvironment context; + protected final ExecutionEnvironment context; // NOTE: the type must not be accessed directly, but only via getType() private TypeInformation<T> type; @@ -109,7 +111,11 @@ public abstract class DataSet<T> { } this.context = context; - this.type = typeInfo; + if (typeInfo instanceof PojoTypeInfo && context.getConfig().isForceKryoEnabled()) { + this.type = new GenericTypeInfo<T>(typeInfo.getTypeClass()); + } else { + this.type = typeInfo; + } } /** @@ -1300,7 +1306,7 @@ public abstract class DataSet<T> { // configure the type if needed if (outputFormat instanceof InputTypeConfigurable) { - ((InputTypeConfigurable) outputFormat).setInputType(getType()); + ((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig() ); } DataSink<T> sink = new DataSink<T>(this, outputFormat, getType()); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index af8095c..1105ab9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -54,10 +54,10 @@ import org.apache.flink.api.java.operators.Operator; import org.apache.flink.api.java.operators.OperatorTranslation; import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import org.apache.flink.api.java.typeutils.runtime.KryoSerializer; import org.apache.flink.core.fs.Path; import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; @@ -103,8 +103,7 @@ public abstract class ExecutionEnvironment { private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>(); private ExecutionConfig config = new ExecutionConfig(); - - + // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- @@ -117,14 +116,6 @@ public abstract class ExecutionEnvironment { } /** - * Sets the config object. - */ - public void setConfig(ExecutionConfig config) { - Validate.notNull(config); - this.config = config; - } - - /** * Gets the config object. */ public ExecutionConfig getConfig() { @@ -208,7 +199,7 @@ public abstract class ExecutionEnvironment { // -------------------------------------------------------------------------------------------- // Registry for types and serializers // -------------------------------------------------------------------------------------------- - + /** * Registers the given Serializer as a default serializer for the given type at the * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}. @@ -220,11 +211,7 @@ public abstract class ExecutionEnvironment { * @param serializer The serializer to use. */ public void registerKryoSerializer(Class<?> type, Serializer<?> serializer) { - if (type == null || serializer == null) { - throw new NullPointerException("Cannot register null class or serializer."); - } - - KryoSerializer.registerSerializer(type, serializer); + config.registerKryoSerializer(type, serializer); } /** @@ -235,11 +222,7 @@ public abstract class ExecutionEnvironment { * @param serializerClass The class of the serializer to use. */ public void registerKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { - if (type == null || serializerClass == null) { - throw new NullPointerException("Cannot register null class or serializer."); - } - - KryoSerializer.registerSerializer(type, serializerClass); + config.registerKryoSerializer(type, serializerClass); } /** @@ -254,10 +237,16 @@ public abstract class ExecutionEnvironment { if (type == null) { throw new NullPointerException("Cannot register null type class."); } - - KryoSerializer.registerType(type); + + TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(type); + + if (typeInfo instanceof PojoTypeInfo) { + config.registerPojoType(type); + } else { + config.registerKryoType(type); + } } - + // -------------------------------------------------------------------------------------------- // Data set creations // -------------------------------------------------------------------------------------------- @@ -555,7 +544,7 @@ public abstract class ExecutionEnvironment { TypeInformation<X> type = TypeExtractor.getForObject(firstValue); CollectionInputFormat.checkCollection(data, type.getTypeClass()); - return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type, Utils.getCallLocationName()); + return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer(config)), type, Utils.getCallLocationName()); } /** @@ -582,7 +571,7 @@ public abstract class ExecutionEnvironment { private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) { CollectionInputFormat.checkCollection(data, type.getTypeClass()); - return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type, callLocationName); + return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer(config)), type, callLocationName); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java index 8e0ff5e..9239e25 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; +import org.apache.flink.api.common.ExecutionConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.InvalidProgramException; @@ -216,7 +217,7 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem * is in fact a tuple type. */ @Override - public void setInputType(TypeInformation<?> type) { + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { if (!type.isTupleType()) { throw new InvalidProgramException("The " + CsvOutputFormat.class.getSimpleName() + " can only be used to write tuple data sets."); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java index 4b2e0ed..23c19f7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -89,7 +90,7 @@ public class LocalCollectionOutputFormat<T> implements OutputFormat<T>, InputTyp @Override @SuppressWarnings("unchecked") - public void setInputType(TypeInformation<?> type) { - this.typeSerializer = (TypeSerializer<T>)type.createSerializer(); + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + this.typeSerializer = (TypeSerializer<T>)type.createSerializer(executionConfig); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java index 8e92c27..7d0dfaa 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.io; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.BinaryInputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -40,7 +41,8 @@ public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> implement public TypeSerializerInputFormat(TypeInformation<T> resultType) { this.resultType = resultType; - this.serializer = resultType.createSerializer(); + // TODO: fix this shit + this.serializer = resultType.createSerializer(new ExecutionConfig()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java index a9b93ee..0c9ed80 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.io; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.BinaryOutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -52,7 +53,7 @@ public class TypeSerializerOutputFormat<T> extends BinaryOutputFormat<T> impleme @Override @SuppressWarnings("unchecked") - public void setInputType(TypeInformation<?> type) { - serializer = (TypeSerializer<T>) type.createSerializer(); + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + serializer = (TypeSerializer<T>) type.createSerializer(executionConfig); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java index 9ea28f7..acfb47a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java @@ -226,7 +226,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, TupleTypeInfo<OUT> returnType, CrossHint hint) { super(input1, input2, - new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), + new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()), returnType, hint, "unknown"); crossProjection = null; @@ -236,7 +236,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, TupleTypeInfo<OUT> returnType, CrossProjection<I1, I2> crossProjection, CrossHint hint) { super(input1, input2, - new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), + new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()), returnType, hint, "unknown"); this.crossProjection = crossProjection; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index e60314f..8b61779 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -633,7 +633,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) { super(input1, input2, keys1, keys2, - new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), + new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()), returnType, hint, Utils.getCallLocationName(4)); // We need to use the 4th element in the stack because the call comes through .types(). @@ -642,7 +642,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType, JoinProjection<I1, I2> joinProj) { super(input1, input2, keys1, keys2, - new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), + new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()), returnType, hint, Utils.getCallLocationName(4)); this.joinProj = joinProj; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java index bddef8f..16d9ff3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java @@ -49,7 +49,7 @@ public class ProjectOperator<IN, OUT extends Tuple> protected final int[] fields; private Projection<IN> proj; - + public ProjectOperator(DataSet<IN> input, int[] fields, TupleTypeInfo<OUT> returnType) { super(input, returnType); @@ -68,7 +68,7 @@ public class ProjectOperator<IN, OUT extends Tuple> protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) { String name = getName() != null ? getName() : "Projection " + Arrays.toString(fields); // create operator - PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, OUT>(fields, name, getInputType(), getResultType()); + PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, OUT>(fields, name, getInputType(), getResultType(), context.getConfig()); // set input ppo.setInput(input); // set dop http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java index 1b452da..959b929 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.operators.translation; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; @@ -27,8 +28,8 @@ import org.apache.flink.api.java.tuple.Tuple; public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> { - public PlanProjectOperator(int[] fields, String name, TypeInformation<T> inType, TypeInformation<R> outType) { - super(new MapProjector<T, R>(fields, outType.createSerializer().createInstance()), new UnaryOperatorInformation<T, R>(inType, outType), name); + public PlanProjectOperator(int[] fields, String name, TypeInformation<T> inType, TypeInformation<R> outType, ExecutionConfig executionConfig) { + super(new MapProjector<T, R>(fields, outType.createSerializer(executionConfig).createInstance()), new UnaryOperatorInformation<T, R>(inType, outType), name); } public static final class MapProjector<T, R extends Tuple> http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java index c2a0b7f..291acce 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -40,7 +41,7 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen } @Override - public TypeComparator<T> createComparator(boolean sortOrderAscending) { + public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { return new EnumComparator<T>(sortOrderAscending); } @@ -75,7 +76,7 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen } @Override - public TypeSerializer<T> createSerializer() { + public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { return new EnumSerializer<T>(typeClass); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java index 5bc6cb9..cb0ac31 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -25,7 +26,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator; import org.apache.flink.api.java.typeutils.runtime.KryoSerializer; - public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> { private final Class<T> typeClass; @@ -33,7 +33,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType public GenericTypeInfo(Class<T> typeClass) { this.typeClass = typeClass; } - + @Override public boolean isBasicType() { return false; @@ -65,16 +65,16 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType } @Override - public TypeSerializer<T> createSerializer() { - return new KryoSerializer<T>(this.typeClass); + public TypeSerializer<T> createSerializer(ExecutionConfig config) { + return new KryoSerializer<T>(this.typeClass, config); } @SuppressWarnings("unchecked") @Override - public TypeComparator<T> createComparator(boolean sortOrderAscending) { + public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { if (isKeyType()) { @SuppressWarnings("rawtypes") - GenericTypeComparator comparator = new GenericTypeComparator(sortOrderAscending, createSerializer(), this.typeClass); + GenericTypeComparator comparator = new GenericTypeComparator(sortOrderAscending, createSerializer(executionConfig), this.typeClass); return (TypeComparator<T>) comparator; } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java index e26326e..f8b4247 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java @@ -19,11 +19,13 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; /** * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this interface to be configured - * with the data type they will operate on. The method {@link #setInputType(TypeInformation)} will be + * with the data type they will operate on. The method {@link #setInputType(org.apache.flink.api + * .common.typeinfo.TypeInformation, org.apache.flink.api.common.ExecutionConfig)} will be * called when the output format is used with an output method such as * {@link org.apache.flink.api.java.DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. */ @@ -32,8 +34,9 @@ public interface InputTypeConfigurable { /** * Method that is called on an {@link org.apache.flink.api.common.io.OutputFormat} when it is passed to * the DataSet's output method. May be used to configures the output format based on the data type. - * + * * @param type The data type of the input. + * @param executionConfig */ - void setInputType(TypeInformation<?> type); + void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java index 10ab02f..f0da264 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -79,7 +80,7 @@ public class MissingTypeInfo extends TypeInformation<InvalidTypesException> { } @Override - public TypeSerializer<InvalidTypesException> createSerializer() { + public TypeSerializer<InvalidTypesException> createSerializer(ExecutionConfig executionConfig) { throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java index 664350e..b4bc86b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java @@ -22,6 +22,7 @@ import java.lang.reflect.GenericArrayType; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -106,15 +107,15 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> { @SuppressWarnings("unchecked") @Override - public TypeSerializer<T> createSerializer() { + public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { // use raw type for serializer if generic array type if (this.componentType instanceof GenericArrayType) { ParameterizedType paramType = (ParameterizedType) ((GenericArrayType) this.componentType).getGenericComponentType(); return (TypeSerializer<T>) new GenericArraySerializer<C>((Class<C>) paramType.getRawType(), - this.componentInfo.createSerializer()); + this.componentInfo.createSerializer(executionConfig)); } else { - return (TypeSerializer<T>) new GenericArraySerializer<C>((Class<C>) this.componentType, this.componentInfo.createSerializer()); + return (TypeSerializer<T>) new GenericArraySerializer<C>((Class<C>) this.componentType, this.componentInfo.createSerializer(executionConfig)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index fb5ca44..503b6f2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -28,13 +28,14 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; import org.apache.flink.api.java.typeutils.runtime.PojoComparator; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; +import org.apache.flink.api.java.operators.Keys.ExpressionKeys; import com.google.common.base.Joiner; @@ -43,7 +44,7 @@ import com.google.common.base.Joiner; * TypeInformation for arbitrary (they have to be java-beans-style) java objects (what we call POJO). * */ -public class PojoTypeInfo<T> extends CompositeType<T>{ +public class PojoTypeInfo<T> extends CompositeType<T> { private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"; private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; @@ -269,7 +270,7 @@ public class PojoTypeInfo<T> extends CompositeType<T>{ } @Override - protected TypeComparator<T> getNewComparator() { + protected TypeComparator<T> getNewComparator(ExecutionConfig config) { // first remove the null array fields final Field[] finalKeyFields = Arrays.copyOf(keyFields, comparatorHelperIndex); @SuppressWarnings("rawtypes") @@ -277,21 +278,21 @@ public class PojoTypeInfo<T> extends CompositeType<T>{ if(finalFieldComparators.length == 0 || finalKeyFields.length == 0 || finalFieldComparators.length != finalKeyFields.length) { throw new IllegalArgumentException("Pojo comparator creation has a bug"); } - return new PojoComparator<T>(finalKeyFields, finalFieldComparators, createSerializer(), typeClass); + return new PojoComparator<T>(finalKeyFields, finalFieldComparators, createSerializer(config), typeClass); } @Override - public TypeSerializer<T> createSerializer() { + public TypeSerializer<T> createSerializer(ExecutionConfig config) { TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ]; Field[] reflectiveFields = new Field[fields.length]; for (int i = 0; i < fields.length; i++) { - fieldSerializers[i] = fields[i].type.createSerializer(); + fieldSerializers[i] = fields[i].type.createSerializer(config); reflectiveFields[i] = fields[i].field; } - return new PojoSerializer<T>(this.typeClass, fieldSerializers, reflectiveFields); + return new PojoSerializer<T>(this.typeClass, fieldSerializers, reflectiveFields, config); } // -------------------------------------------------------------------------------------------- @@ -312,7 +313,7 @@ public class PojoTypeInfo<T> extends CompositeType<T>{ for (PojoField field : fields) { fieldStrings.add(field.field.getName() + ": " + field.type.toString()); } - return "PojoType<" + typeClass.getCanonicalName() + return "PojoType<" + typeClass.getName() + ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]" + ">"; } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java index 2464f25..60fa110 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.record.RecordSerializer; @@ -60,7 +61,7 @@ public class RecordTypeInfo extends TypeInformation<Record> { } @Override - public TypeSerializer<Record> createSerializer() { + public TypeSerializer<Record> createSerializer(ExecutionConfig config) { return RecordSerializer.get(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java index af258e2..c1e573d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java @@ -20,11 +20,12 @@ package org.apache.flink.api.java.typeutils; import java.util.Arrays; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator import org.apache.flink.api.java.tuple.*; //CHECKSTYLE.ON: AvoidStarImport import org.apache.flink.api.java.typeutils.runtime.TupleComparator; @@ -48,10 +49,10 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { } @Override - public TupleSerializer<T> createSerializer() { + public TupleSerializer<T> createSerializer(ExecutionConfig executionConfig) { TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[getArity()]; for (int i = 0; i < types.length; i++) { - fieldSerializers[i] = types[i].createSerializer(); + fieldSerializers[i] = types[i].createSerializer(executionConfig); } Class<T> tupleClass = getTypeClass(); @@ -81,7 +82,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { } @Override - protected TypeComparator<T> getNewComparator() { + protected TypeComparator<T> getNewComparator(ExecutionConfig executionConfig) { @SuppressWarnings("rawtypes") final TypeComparator[] finalFieldComparators = Arrays.copyOf(fieldComparators, comparatorHelperIndex); final int[] finalLogicalKeyFields = Arrays.copyOf(logicalKeyFields, comparatorHelperIndex); @@ -93,7 +94,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { } TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1]; for (int i = 0; i <= maxKey; i++) { - fieldSerializers[i] = types[i].createSerializer(); + fieldSerializers[i] = types[i].createSerializer(executionConfig); } if(finalFieldComparators.length == 0 || finalLogicalKeyFields.length == 0 || fieldSerializers.length == 0 || finalFieldComparators.length != finalLogicalKeyFields.length) { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index c99a80f..1b3003f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -1255,6 +1255,7 @@ public class TypeExtractor { @SuppressWarnings("unchecked") private <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy, ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + // add the hierarchy of the POJO itself if it is generic if (parameterizedType != null) { getTypeHierarchy(typeHierarchy, parameterizedType, Object.class); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java index 79a2760..1486ee5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -79,7 +80,7 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement @Override @SuppressWarnings("unchecked") - public TypeSerializer<T> createSerializer() { + public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { if (CopyableValue.class.isAssignableFrom(type)) { return (TypeSerializer<T>) createCopyableValueSerializer(type.asSubclass(CopyableValue.class)); } @@ -90,7 +91,7 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public TypeComparator<T> createComparator(boolean sortOrderAscending) { + public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { if (!isKeyType()) { throw new RuntimeException("The type " + type.getName() + " is not Comparable."); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java index 195ce25..89d7818 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -43,7 +44,7 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - public TypeComparator<T> createComparator(boolean sortOrderAscending) { + public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { if(Comparable.class.isAssignableFrom(typeClass)) { return new WritableComparator(sortOrderAscending, typeClass); } @@ -84,7 +85,7 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp } @Override - public TypeSerializer<T> createSerializer() { + public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { return new WritableSerializer<T>(typeClass); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index c55cd71..133dd57 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -27,6 +27,7 @@ import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.twitter.chill.ScalaKryoInstantiator; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -36,9 +37,6 @@ import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.lang.reflect.Modifier; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -55,13 +53,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> { private static final long serialVersionUID = 3L; - private static Map<Class<?>, Serializer<?>> staticRegisteredSerializers = new HashMap<Class<?>, Serializer<?>>(); - private static Map<Class<?>, Class<? extends Serializer<?>>> staticRegisteredSerializersClasses = new HashMap<Class<?>, Class<? extends Serializer<?>>>(); - - private static Set<Class<?>> staticRegisteredTypes = new HashSet<Class<?>>(); - // ------------------------------------------------------------------------ - + private final Map<Class<?>, Serializer<?>> registeredSerializers; private final Map<Class<?>, Class<? extends Serializer<?>>> registeredSerializersClasses; private final Set<Class<?>> registeredTypes; @@ -82,29 +75,15 @@ public class KryoSerializer<T> extends TypeSerializer<T> { // ------------------------------------------------------------------------ - public KryoSerializer(Class<T> type){ + public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ if(type == null){ throw new NullPointerException("Type class cannot be null."); } this.type = type; - // create copies of the statically registered serializers - // we use static synchronization to safeguard against concurrent use - // of the static collections. - synchronized (KryoSerializer.class) { - this.registeredSerializers = staticRegisteredSerializers.isEmpty() ? - Collections.<Class<?>, Serializer<?>>emptyMap() : - new HashMap<Class<?>, Serializer<?>>(staticRegisteredSerializers); - - this.registeredSerializersClasses = staticRegisteredSerializersClasses.isEmpty() ? - Collections.<Class<?>, Class<? extends Serializer<?>>>emptyMap() : - new HashMap<Class<?>, Class<? extends Serializer<?>>>(staticRegisteredSerializersClasses); - - this.registeredTypes = staticRegisteredTypes.isEmpty() ? - Collections.<Class<?>>emptySet() : - new HashSet<Class<?>>(staticRegisteredTypes); - } - + this.registeredSerializers = executionConfig.getRegisteredKryoSerializers(); + this.registeredSerializersClasses = executionConfig.getRegisteredKryoSerializersClasses(); + this.registeredTypes = executionConfig.getRegisteredKryoTypes(); } /** @@ -289,69 +268,6 @@ public class KryoSerializer<T> extends TypeSerializer<T> { kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } } - - // -------------------------------------------------------------------------------------------- - // For registering custom serializers and types - // -------------------------------------------------------------------------------------------- - - /** - * Registers the given Serializer as a default serializer for the given class at the Kryo - * instance. - * Note that the serializer instance must be serializable (as defined by java.io.Serializable), - * because it may be distributed to the worker nodes by java serialization. - * - * @param clazz The class of the types serialized with the given serializer. - * @param serializer The serializer to use. - * @throws IllegalArgumentException Thrown, if the serializer is not serializable. - */ - public static void registerSerializer(Class<?> clazz, Serializer<?> serializer) { - if (clazz == null || serializer == null) { - throw new NullPointerException("Cannot register null class or serializer."); - } - if (!(serializer instanceof java.io.Serializable)) { - throw new IllegalArgumentException("The serializer instance must be serializable, (for distributing it in the cluster), " - + "as defined by java.io.Serializable. For stateless serializers, you can use the " - + "'registerSerializer(Class, Class)' method to register the serializer via its class."); - } - - synchronized (KryoSerializer.class) { - staticRegisteredSerializers.put(clazz, serializer); - } - } - - /** - * Registers a serializer via its class as a default serializer for the given class at the Kryo - * instance. - * - * @param clazz The class of the types serialized with the given serializer. - * @param serializerClass The serializer to use. - */ - public static void registerSerializer(Class<?> clazz, Class<? extends Serializer<?>> serializerClass) { - if (clazz == null || serializerClass == null) { - throw new NullPointerException("Cannot register null class or serializer."); - } - - synchronized (KryoSerializer.class) { - staticRegisteredSerializersClasses.put(clazz, serializerClass); - } - } - - /** - * Registers the given type with Kryo. Registering the type allows Kryo to write abbreviated - * name tags, rather than full class names, thereby vastly increasing the serialization - * performance in many cases. - * - * @param type The class of the type to register. - */ - public static void registerType(Class<?> type) { - if (type == null) { - throw new NullPointerException("Cannot register null type class."); - } - - synchronized (KryoSerializer.class) { - staticRegisteredTypes.add(type); - } - } // -------------------------------------------------------------------------------------------- // For testing http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 15e8537..6033094 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -22,36 +22,96 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; public final class PojoSerializer<T> extends TypeSerializer<T> { + // Flags for the header + private static byte IS_NULL = 1; + private static byte NO_SUBCLASS = 2; + private static byte IS_SUBCLASS = 4; + private static byte IS_TAGGED_SUBCLASS = 8; + private static final long serialVersionUID = 1L; private final Class<T> clazz; - private final TypeSerializer<Object>[] fieldSerializers; + private TypeSerializer<Object>[] fieldSerializers; // We need to handle these ourselves in writeObject()/readObject() private transient Field[] fields; - private final int numFields; + private int numFields; + + private transient Map<Class<?>, TypeSerializer> subclassSerializerCache; + private transient ClassLoader cl; + + private Map<Class<?>, Integer> registeredClasses; + + private TypeSerializer[] registeredSerializers; + + private final ExecutionConfig executionConfig; @SuppressWarnings("unchecked") - public PojoSerializer(Class<T> clazz, TypeSerializer<?>[] fieldSerializers, Field[] fields) { + public PojoSerializer( + Class<T> clazz, + TypeSerializer<?>[] fieldSerializers, + Field[] fields, + ExecutionConfig executionConfig) { this.clazz = clazz; this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers; this.fields = fields; this.numFields = fieldSerializers.length; + this.executionConfig = executionConfig; + + Set<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); } + + cl = Thread.currentThread().getContextClassLoader(); + + subclassSerializerCache = new HashMap<Class<?>, TypeSerializer>(); + + // We only want those classes that are not our own class and are actually sub-classes. + List<Class<?>> cleanedTaggedClasses = new ArrayList<Class<?>>(registeredPojoTypes.size()); + for (Class<?> registeredClass: registeredPojoTypes) { + if (registeredClass.equals(clazz)) { + continue; + } + if (!clazz.isAssignableFrom(registeredClass)) { + continue; + } + cleanedTaggedClasses.add(registeredClass); + + } + this.registeredClasses = new LinkedHashMap<Class<?>, Integer>(cleanedTaggedClasses.size()); + registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()]; + + int id = 0; + for (Class<?> registeredClass: cleanedTaggedClasses) { + this.registeredClasses.put(registeredClass, id); + TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(registeredClass); + registeredSerializers[id] = typeInfo.createSerializer(executionConfig); + + id++; + } } private void writeObject(ObjectOutputStream out) @@ -67,9 +127,9 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - int numKeyFields = in.readInt(); - fields = new Field[numKeyFields]; - for (int i = 0; i < numKeyFields; i++) { + int numFields = in.readInt(); + fields = new Field[numFields]; + for (int i = 0; i < numFields; i++) { Class<?> clazz = (Class<?>)in.readObject(); String fieldName = in.readUTF(); fields[i] = null; @@ -88,8 +148,42 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { + " (" + fieldName + ")"); } } + + cl = Thread.currentThread().getContextClassLoader(); + subclassSerializerCache = new HashMap<Class<?>, TypeSerializer>(); + } + + private TypeSerializer getSubclassSerializer(Class<?> subclass) { + TypeSerializer<?> result = subclassSerializerCache.get(subclass); + if (result == null) { + + TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(subclass); + result = typeInfo.createSerializer(executionConfig); + if (result instanceof PojoSerializer) { + PojoSerializer<?> subclassSerializer = (PojoSerializer<?>) result; + subclassSerializer.copyBaseFieldOrder(this); + } + subclassSerializerCache.put(subclass, result); + + } + return result; + } + + private boolean hasField(Field f) { + for (Field field: fields) { + if (f.equals(field)) { + return true; + } + } + return false; + } + + @SuppressWarnings("unchecked") + private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) { + // do nothing for now, but in the future, adapt subclass serializer to have same + // ordering as base class serializer so that binary comparison on base class fields + // can work } - @Override public boolean isImmutableType() { @@ -110,7 +204,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } if (stateful) { - return new PojoSerializer<T>(clazz, duplicateFieldSerializers, fields); + return new PojoSerializer<T>(clazz, duplicateFieldSerializers, fields, executionConfig); } else { return this; } @@ -119,13 +213,12 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { @Override public T createInstance() { + if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) { + return null; + } try { T t = clazz.newInstance(); - - for (int i = 0; i < numFields; i++) { - fields[i].set(t, fieldSerializers[i].createInstance()); - } - + initializeFields(t); return t; } catch (Exception e) { @@ -133,51 +226,89 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } } + protected void initializeFields(T t) { + for (int i = 0; i < numFields; i++) { + try { + fields[i].set(t, fieldSerializers[i].createInstance()); + } catch (IllegalAccessException e) { + throw new RuntimeException("Cannot initialize fields.", e); + } + } + } + @Override + @SuppressWarnings("unchecked") public T copy(T from) { - T target; - try { - target = clazz.newInstance(); - } - catch (Throwable t) { - throw new RuntimeException("Cannot instantiate class.", t); + if (from == null) { + return null; } - - try { - for (int i = 0; i < numFields; i++) { - Object value = fields[i].get(from); - if (value != null) { - Object copy = fieldSerializers[i].copy(value); - fields[i].set(target, copy); - } - else { - fields[i].set(target, null); + + Class<?> actualType = from.getClass(); + if (actualType == clazz) { + T target; + try { + target = (T) from.getClass().newInstance(); + } + catch (Throwable t) { + throw new RuntimeException("Cannot instantiate class.", t); + } + // no subclass + try { + for (int i = 0; i < numFields; i++) { + Object value = fields[i].get(from); + if (value != null) { + Object copy = fieldSerializers[i].copy(value); + fields[i].set(target, copy); + } + else { + fields[i].set(target, null); + } } + } catch (IllegalAccessException e) { + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before."); + } + return target; + } else { + // subclass + TypeSerializer subclassSerializer = getSubclassSerializer(actualType); + return (T) subclassSerializer.copy(from); } - catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before."); - } - return target; } @Override + @SuppressWarnings("unchecked") public T copy(T from, T reuse) { - try { - for (int i = 0; i < numFields; i++) { - Object value = fields[i].get(from); - if (value != null) { - Object copy = fieldSerializers[i].copy(fields[i].get(from), fields[i].get(reuse)); - fields[i].set(reuse, copy); - } - else { - fields[i].set(reuse, null); + if (from == null) { + return null; + } + + Class<?> actualType = from.getClass(); + if (reuse == null || actualType != reuse.getClass()) { + // cannot reuse, do a non-reuse copy + return copy(from); + } + + if (actualType == clazz) { + try { + for (int i = 0; i < numFields; i++) { + Object value = fields[i].get(from); + if (value != null) { + Object copy = fieldSerializers[i].copy(fields[i].get(from), fields[i].get(reuse)); + fields[i].set(reuse, copy); + } + else { + fields[i].set(reuse, null); + } } + } catch (IllegalAccessException e) { + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); } - } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + - "before."); + } else { + TypeSerializer subclassSerializer = getSubclassSerializer(actualType); + reuse = (T) subclassSerializer.copy(from, reuse); } + return reuse; } @@ -188,94 +319,228 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { @Override + @SuppressWarnings("unchecked") public void serialize(T value, DataOutputView target) throws IOException { + int flags = 0; // handle null values if (value == null) { - target.writeBoolean(true); + flags |= IS_NULL; + target.writeByte(flags); return; + } + + Integer subclassTag = -1; + Class<?> actualClass = value.getClass(); + TypeSerializer subclassSerializer = null; + if (clazz != actualClass) { + subclassTag = registeredClasses.get(actualClass); + if (subclassTag != null) { + flags |= IS_TAGGED_SUBCLASS; + subclassSerializer = registeredSerializers[subclassTag]; + } else { + flags |= IS_SUBCLASS; + subclassSerializer = getSubclassSerializer(actualClass); + } } else { - target.writeBoolean(false); + flags |= NO_SUBCLASS; } - try { - for (int i = 0; i < numFields; i++) { - Object o = fields[i].get(value); - if(o == null) { - target.writeBoolean(true); // null field handling - } else { - target.writeBoolean(false); - fieldSerializers[i].serialize(o, target); + + target.writeByte(flags); + + if ((flags & IS_SUBCLASS) != 0) { + target.writeUTF(actualClass.getName()); + } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { + target.writeByte(subclassTag); + } + + + if ((flags & NO_SUBCLASS) != 0) { + try { + for (int i = 0; i < numFields; i++) { + Object o = fields[i].get(value); + if (o == null) { + target.writeBoolean(true); // null field handling + } else { + target.writeBoolean(false); + fieldSerializers[i].serialize(o, target); + } } + } catch (IllegalAccessException e) { + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); + + } + } else { + // subclass + if (subclassSerializer != null) { + subclassSerializer.serialize(value, target); } - } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + - "before."); } } @Override + @SuppressWarnings("unchecked") public T deserialize(DataInputView source) throws IOException { - boolean isNull = source.readBoolean(); - if(isNull) { + int flags = source.readByte(); + if((flags & IS_NULL) != 0) { return null; } + T target; - try { - target = clazz.newInstance(); - } - catch (Throwable t) { - throw new RuntimeException("Cannot instantiate class.", t); + + Class<?> actualSubclass = null; + TypeSerializer subclassSerializer = null; + + if ((flags & IS_SUBCLASS) != 0) { + String subclassName = source.readUTF(); + try { + actualSubclass = Class.forName(subclassName, true, cl); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot instantiate class.", e); + } + subclassSerializer = getSubclassSerializer(actualSubclass); + target = (T) subclassSerializer.createInstance(); + // also initialize fields for which the subclass serializer is not responsible + initializeFields(target); + } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { + + int subclassTag = source.readByte(); + subclassSerializer = registeredSerializers[subclassTag]; + target = (T) subclassSerializer.createInstance(); + // also initialize fields for which the subclass serializer is not responsible + initializeFields(target); + } else { + target = createInstance(); } - - try { - for (int i = 0; i < numFields; i++) { - isNull = source.readBoolean(); - if(isNull) { - fields[i].set(target, null); - } else { - Object field = fieldSerializers[i].deserialize(source); - fields[i].set(target, field); + + if ((flags & NO_SUBCLASS) != 0) { + try { + for (int i = 0; i < numFields; i++) { + boolean isNull = source.readBoolean(); + if (isNull) { + fields[i].set(target, null); + } else { + Object field = fieldSerializers[i].deserialize(source); + fields[i].set(target, field); + } } + } catch (IllegalAccessException e) { + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); + + } + } else { + if (subclassSerializer != null) { + target = (T) subclassSerializer.deserialize(target, source); } - } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + - "before."); } return target; } @Override + @SuppressWarnings("unchecked") public T deserialize(T reuse, DataInputView source) throws IOException { + // handle null values - boolean isNull = source.readBoolean(); - if (isNull) { + int flags = source.readByte(); + if((flags & IS_NULL) != 0) { return null; } - try { - for (int i = 0; i < numFields; i++) { - isNull = source.readBoolean(); - if(isNull) { - fields[i].set(reuse, null); - } else { - Object field = fieldSerializers[i].deserialize(fields[i].get(reuse), source); - fields[i].set(reuse, field); + + Class<?> subclass = null; + TypeSerializer subclassSerializer = null; + if ((flags & IS_SUBCLASS) != 0) { + String subclassName = source.readUTF(); + try { + subclass = Class.forName(subclassName, true, cl); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot instantiate class.", e); + } + subclassSerializer = getSubclassSerializer(subclass); + + if (reuse == null || subclass != reuse.getClass()) { + // cannot reuse + reuse = (T) subclassSerializer.createInstance(); + // also initialize fields for which the subclass serializer is not responsible + initializeFields(reuse); + } + } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { + int subclassTag = source.readByte(); + subclassSerializer = registeredSerializers[subclassTag]; + + if (reuse == null || ((PojoSerializer)subclassSerializer).clazz != reuse.getClass()) { + // cannot reuse + reuse = (T) subclassSerializer.createInstance(); + // also initialize fields for which the subclass serializer is not responsible + initializeFields(reuse); + } + } else { + if (reuse == null || clazz != reuse.getClass()) { + reuse = createInstance(); + } + } + + if ((flags & NO_SUBCLASS) != 0) { + try { + for (int i = 0; i < numFields; i++) { + boolean isNull = source.readBoolean(); + if (isNull) { + fields[i].set(reuse, null); + } else { + Object field = fieldSerializers[i].deserialize(fields[i].get(reuse), source); + + fields[i].set(reuse, field); + } } + } catch (IllegalAccessException e) { + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); + } + } else { + if (subclassSerializer != null) { + reuse = (T) subclassSerializer.deserialize(reuse, source); } - } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + - "before."); } + return reuse; } @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - // copy the Non-Null/Null tag - target.writeBoolean(source.readBoolean()); - for (int i = 0; i < numFields; i++) { - boolean isNull = source.readBoolean(); - target.writeBoolean(isNull); - if (!isNull) { - fieldSerializers[i].copy(source, target); + // copy the flags + int flags = source.readByte(); + target.writeByte(flags); + + if ((flags & IS_NULL) != 0) { + // is a null value, nothing further to copy + return; + } + + TypeSerializer<?> subclassSerializer = null; + if ((flags & IS_SUBCLASS) != 0) { + String className = source.readUTF(); + target.writeUTF(className); + try { + Class<?> subclass = Class.forName(className, true, Thread.currentThread() + .getContextClassLoader()); + subclassSerializer = getSubclassSerializer(subclass); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot instantiate class.", e); + } + } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { + int subclassTag = source.readByte(); + target.writeByte(subclassTag); + subclassSerializer = registeredSerializers[subclassTag]; + } + + if ((flags & NO_SUBCLASS) != 0) { + for (int i = 0; i < numFields; i++) { + boolean isNull = source.readBoolean(); + target.writeBoolean(isNull); + if (!isNull) { + fieldSerializers[i].copy(source, target); + } + } + } else { + if (subclassSerializer != null) { + subclassSerializer.copy(source, target); } } }
