[FLINK-1369] [types] Add support for Subclasses, Interfaces, Abstract Classes.
- Abstract classes with fields are handled as POJO types. - Interfaces and abstract classes without fields are handled as generic types. This closes #236 This closes #316 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7407076d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7407076d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7407076d Branch: refs/heads/master Commit: 7407076d3990752eb5fa4072cd036efd2f656cbc Parents: 6b402f4 Author: Aljoscha Krettek <[email protected]> Authored: Wed Nov 26 13:27:06 2014 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Feb 10 13:15:03 2015 +0100 ---------------------------------------------------------------------- .../compiler/postpass/JavaApiPostPass.java | 15 +- .../flink/compiler/util/NoOpBinaryUdfOp.java | 3 +- .../flink/compiler/util/NoOpUnaryUdfOp.java | 3 +- flink-core/pom.xml | 48 +- .../flink/api/common/ExecutionConfig.java | 160 ++++++- .../api/common/functions/RuntimeContext.java | 7 + .../util/AbstractRuntimeUDFContext.java | 17 +- .../functions/util/RuntimeUDFContext.java | 9 +- .../common/operators/CollectionExecutor.java | 32 +- .../api/common/operators/DualInputOperator.java | 3 +- .../common/operators/GenericDataSinkBase.java | 7 +- .../common/operators/GenericDataSourceBase.java | 5 +- .../common/operators/SingleInputOperator.java | 3 +- .../flink/api/common/operators/Union.java | 3 +- .../operators/base/BulkIterationBase.java | 3 +- .../operators/base/CoGroupOperatorBase.java | 27 +- .../base/CollectorMapOperatorBase.java | 3 +- .../operators/base/CrossOperatorBase.java | 13 +- .../operators/base/DeltaIterationBase.java | 3 +- .../operators/base/FilterOperatorBase.java | 3 +- .../operators/base/FlatMapOperatorBase.java | 11 +- .../operators/base/GroupReduceOperatorBase.java | 23 +- .../common/operators/base/JoinOperatorBase.java | 21 +- .../common/operators/base/MapOperatorBase.java | 11 +- .../base/MapPartitionOperatorBase.java | 11 +- .../operators/base/PartitionOperatorBase.java | 3 +- .../operators/base/ReduceOperatorBase.java | 14 +- .../flink/api/common/typeinfo/AtomicType.java | 3 +- .../api/common/typeinfo/BasicArrayTypeInfo.java | 5 +- .../api/common/typeinfo/BasicTypeInfo.java | 5 +- .../api/common/typeinfo/NothingTypeInfo.java | 3 +- .../common/typeinfo/PrimitiveArrayTypeInfo.java | 3 +- .../api/common/typeinfo/TypeInformation.java | 9 +- .../api/common/typeutils/CompositeType.java | 11 +- .../functions/util/RuntimeUDFContextTest.java | 11 +- .../base/FlatMapOperatorCollectionTest.java | 9 +- .../operators/base/JoinOperatorBaseTest.java | 15 +- .../common/operators/base/MapOperatorTest.java | 16 +- .../base/PartitionMapOperatorTest.java | 10 +- .../common/typeutils/SerializerTestBase.java | 2 +- .../flink/api/java/CollectionEnvironment.java | 2 +- .../java/org/apache/flink/api/java/DataSet.java | 12 +- .../flink/api/java/ExecutionEnvironment.java | 43 +- .../flink/api/java/io/CsvOutputFormat.java | 3 +- .../java/io/LocalCollectionOutputFormat.java | 5 +- .../api/java/io/TypeSerializerInputFormat.java | 4 +- .../api/java/io/TypeSerializerOutputFormat.java | 5 +- .../flink/api/java/operators/CrossOperator.java | 4 +- .../flink/api/java/operators/JoinOperator.java | 4 +- .../api/java/operators/ProjectOperator.java | 4 +- .../translation/PlanProjectOperator.java | 5 +- .../flink/api/java/typeutils/EnumTypeInfo.java | 5 +- .../api/java/typeutils/GenericTypeInfo.java | 12 +- .../java/typeutils/InputTypeConfigurable.java | 9 +- .../api/java/typeutils/MissingTypeInfo.java | 3 +- .../api/java/typeutils/ObjectArrayTypeInfo.java | 7 +- .../flink/api/java/typeutils/PojoTypeInfo.java | 17 +- .../api/java/typeutils/RecordTypeInfo.java | 3 +- .../flink/api/java/typeutils/TupleTypeInfo.java | 11 +- .../flink/api/java/typeutils/TypeExtractor.java | 1 + .../flink/api/java/typeutils/ValueTypeInfo.java | 5 +- .../api/java/typeutils/WritableTypeInfo.java | 5 +- .../java/typeutils/runtime/KryoSerializer.java | 96 +--- .../java/typeutils/runtime/PojoSerializer.java | 459 +++++++++++++++---- .../base/CoGroupOperatorCollectionTest.java | 20 +- .../operators/base/GroupReduceOperatorTest.java | 17 +- .../operators/base/JoinOperatorBaseTest.java | 9 +- .../operators/base/ReduceOperatorTest.java | 15 +- .../api/java/io/CollectionInputFormatTest.java | 5 +- .../api/java/io/TypeSerializerFormatTest.java | 3 +- .../java/type/extractor/TypeExtractorTest.java | 38 +- .../api/java/typeutils/CompositeTypeTest.java | 3 +- .../api/java/typeutils/TypeInfoParserTest.java | 8 +- .../runtime/CopyableValueComparatorTest.java | 2 - .../runtime/KryoGenericArraySerializerTest.java | 3 +- .../runtime/KryoGenericTypeComparatorTest.java | 3 +- .../runtime/KryoGenericTypeSerializerTest.java | 5 +- .../runtime/KryoVersusAvroMinibenchmark.java | 13 +- .../runtime/KryoWithCustomSerializersTest.java | 12 +- .../MultidimensionalArraySerializerTest.java | 11 +- .../typeutils/runtime/PojoComparatorTest.java | 5 +- .../runtime/PojoGenericTypeSerializerTest.java | 3 +- .../typeutils/runtime/PojoSerializerTest.java | 13 +- .../runtime/PojoSubclassComparatorTest.java | 76 +++ .../runtime/PojoSubclassSerializerTest.java | 196 ++++++++ .../SubclassFromInterfaceSerializerTest.java | 170 +++++++ .../runtime/TupleComparatorILD2Test.java | 2 - .../runtime/TupleComparatorILD3Test.java | 2 - .../runtime/TupleComparatorILDC3Test.java | 2 - .../runtime/TupleComparatorILDX1Test.java | 2 - .../runtime/TupleComparatorILDXC2Test.java | 2 - .../runtime/TupleComparatorISD1Test.java | 2 - .../runtime/TupleComparatorISD2Test.java | 2 - .../runtime/TupleComparatorISD3Test.java | 2 - .../typeutils/runtime/TupleSerializerTest.java | 3 +- .../typeutils/runtime/ValueComparatorTest.java | 2 - .../runtime/WritableComparatorTest.java | 2 - .../runtime/WritableSerializerTest.java | 4 +- .../java/type/lambdas/LambdaExtractionTest.java | 1 - .../task/AbstractIterativePactTask.java | 7 +- .../jobgraph/tasks/AbstractInvokable.java | 27 ++ .../flink/runtime/operators/DataSourceTask.java | 18 - .../runtime/operators/PactTaskContext.java | 6 +- .../runtime/operators/RegularPactTask.java | 19 +- .../operators/chaining/ChainedDriver.java | 2 +- .../operators/sort/LargeRecordHandler.java | 11 +- .../util/DistributedRuntimeUDFContext.java | 9 +- .../drivers/AllGroupReduceDriverTest.java | 15 +- .../operators/drivers/AllReduceDriverTest.java | 27 +- .../drivers/GroupReduceDriverTest.java | 39 +- .../drivers/ReduceCombineDriverTest.java | 39 +- .../operators/drivers/ReduceDriverTest.java | 39 +- .../sort/ExternalSortLargeRecordsITCase.java | 17 +- .../sort/LargeRecordHandlerITCase.java | 7 +- .../operators/sort/LargeRecordHandlerTest.java | 13 +- .../sort/MassiveStringSortingITCase.java | 5 +- .../sort/MassiveStringValueSortingITCase.java | 5 +- .../scala/operators/ScalaAggregateOperator.java | 15 +- .../scala/operators/ScalaCsvInputFormat.java | 7 +- .../scala/operators/ScalaCsvOutputFormat.java | 3 +- .../apache/flink/api/scala/CrossDataSet.scala | 5 +- .../flink/api/scala/ExecutionEnvironment.scala | 8 +- .../api/scala/UnfinishedCoGroupOperation.scala | 6 +- .../flink/api/scala/codegen/TypeAnalyzer.scala | 8 +- .../api/scala/codegen/TypeInformationGen.scala | 12 +- .../apache/flink/api/scala/joinDataSet.scala | 6 +- .../api/scala/typeutils/CaseClassTypeInfo.scala | 11 +- .../api/scala/typeutils/EitherTypeInfo.scala | 17 +- .../api/scala/typeutils/OptionTypeInfo.scala | 6 +- .../scala/typeutils/TraversableTypeInfo.scala | 3 +- .../api/scala/typeutils/TrySerializer.scala | 5 +- .../flink/api/scala/typeutils/TryTypeInfo.scala | 8 +- .../mapred/HadoopReduceCombineFunction.java | 4 +- .../mapred/HadoopReduceFunction.java | 4 +- .../wrapper/HadoopTupleUnwrappingIterator.java | 5 +- .../HadoopTupleUnwrappingIteratorTest.java | 4 +- .../apache/flink/streaming/api/StreamGraph.java | 19 +- .../streaming/api/datastream/DataStream.java | 2 +- .../temporaloperator/StreamJoinOperator.java | 8 +- .../environment/StreamExecutionEnvironment.java | 63 ++- .../aggregation/AggregationFunction.java | 4 +- .../aggregation/ComparableAggregator.java | 2 +- .../api/function/aggregation/SumAggregator.java | 2 +- .../api/function/source/FileSourceFunction.java | 4 +- .../api/invokable/StreamInvokable.java | 9 +- .../operator/GroupedWindowInvokable.java | 2 +- .../invokable/operator/ProjectInvokable.java | 2 +- .../api/invokable/operator/co/CoInvokable.java | 3 +- .../streamrecord/StreamRecordSerializer.java | 5 +- .../api/streamvertex/CoStreamVertex.java | 2 +- .../api/streamvertex/StreamVertex.java | 4 +- .../streamvertex/StreamingRuntimeContext.java | 5 +- .../streaming/util/keys/KeySelectorUtil.java | 6 +- .../streaming/api/AggregationFunctionTest.java | 3 +- .../flink/streaming/util/MockCoContext.java | 7 +- .../flink/streaming/util/MockContext.java | 5 +- .../flink/streaming/api/scala/DataStream.scala | 6 +- .../api/scala/StreamCrossOperator.scala | 6 +- .../api/scala/StreamExecutionEnvironment.scala | 30 ++ .../api/scala/StreamJoinOperator.scala | 25 +- .../api/scala/WindowedDataStream.scala | 8 +- .../WordCountSubclassInterfacePOJOITCase.java | 152 ++++++ .../WordCountSubclassPOJOITCase.java | 123 +++++ .../scala/io/CollectionInputFormatTest.scala | 10 +- .../misc/MassiveCaseClassSortingITCase.scala | 9 +- .../scala/runtime/CaseClassComparatorTest.scala | 9 +- .../runtime/KryoGenericTypeSerializerTest.scala | 13 +- .../ScalaSpecialTypesSerializerTest.scala | 6 +- .../runtime/TraversableSerializerTest.scala | 5 +- .../scala/runtime/TupleComparatorILD2Test.scala | 8 +- .../scala/runtime/TupleComparatorILD3Test.scala | 9 +- .../runtime/TupleComparatorILDC3Test.scala | 9 +- .../runtime/TupleComparatorILDX1Test.scala | 5 +- .../runtime/TupleComparatorILDXC2Test.scala | 5 +- .../scala/runtime/TupleComparatorISD1Test.scala | 5 +- .../scala/runtime/TupleComparatorISD2Test.scala | 5 +- .../scala/runtime/TupleComparatorISD3Test.scala | 9 +- .../api/scala/runtime/TupleSerializerTest.scala | 10 +- .../scala/types/TypeInformationGenTest.scala | 1 + 179 files changed, 2211 insertions(+), 801 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java index 208ff2e..11ac231 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.DualInputOperator; import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.Operator; @@ -70,9 +71,13 @@ public class JavaApiPostPass implements OptimizerPostPass { private final Set<PlanNode> alreadyDone = new HashSet<PlanNode>(); + private ExecutionConfig executionConfig = null; @Override public void postPass(OptimizedPlan plan) { + + executionConfig = plan.getOriginalPactPlan().getExecutionConfig(); + for (SinkPlanNode sink : plan.getDataSinks()) { traverse(sink); } @@ -275,22 +280,22 @@ public class JavaApiPostPass implements OptimizerPostPass { } } - private static <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T> typeInfo) { - TypeSerializer<T> serializer = typeInfo.createSerializer(); + private <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T> typeInfo) { + TypeSerializer<T> serializer = typeInfo.createSerializer(executionConfig); return new RuntimeSerializerFactory<T>(serializer, typeInfo.getTypeClass()); } @SuppressWarnings("unchecked") - private static <T> TypeComparatorFactory<?> createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] sortOrder) { + private <T> TypeComparatorFactory<?> createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] sortOrder) { TypeComparator<T> comparator; if (typeInfo instanceof CompositeType) { - comparator = ((CompositeType<T>) typeInfo).createComparator(keys.toArray(), sortOrder, 0); + comparator = ((CompositeType<T>) typeInfo).createComparator(keys.toArray(), sortOrder, 0, executionConfig); } else if (typeInfo instanceof AtomicType) { // handle grouping of atomic types - comparator = ((AtomicType<T>) typeInfo).createComparator(sortOrder[0]); + comparator = ((AtomicType<T>) typeInfo).createComparator(sortOrder[0], executionConfig); } else { throw new RuntimeException("Unrecognized type: " + typeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java index 4b48ec7..166b7b8 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java @@ -20,6 +20,7 @@ package org.apache.flink.compiler.util; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.NoOpFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; @@ -43,7 +44,7 @@ public class NoOpBinaryUdfOp<OUT> extends DualInputOperator<OUT, OUT, OUT, NoOpF } @Override - protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext, boolean mutables) { + protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java index 474d3a4..5013ae5 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java @@ -20,6 +20,7 @@ package org.apache.flink.compiler.util; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.NoOpFunction; import org.apache.flink.api.common.operators.RecordOperator; @@ -54,7 +55,7 @@ public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunct } @Override - protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext, boolean mutables) { + protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) { return inputData; } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 810860e..182a77a 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -46,7 +46,18 @@ under the License. <artifactId>commons-collections</artifactId> <!-- managed version --> </dependency> - + + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill_2.10</artifactId> + <version>0.5.1</version> + </dependency> + <!-- guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading --> <dependency> <groupId>com.google.guava</groupId> @@ -72,4 +83,39 @@ under the License. </plugins> </build> + <!-- See main pom.xml for explanation of profiles --> + <profiles> + <profile> + <id>hadoop-1</id> + <activation> + <property> + <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh --> + <!--hadoop1--><name>hadoop.profile</name><value>1</value> + </property> + </activation> + <dependencies> + <!-- "Old" Hadoop = MapReduce v1 --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </dependency> + </dependencies> + </profile> + <profile> + <id>hadoop-2</id> + <activation> + <property> + <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh --> + <!--hadoop2--><name>!hadoop.profile</name> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> + </project> http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 17e683f..5fa01b7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -18,7 +18,13 @@ package org.apache.flink.api.common; +import com.esotericsoftware.kryo.Serializer; + import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; /** * A configuration config for configuring behavior of the system, such as whether to use @@ -34,11 +40,17 @@ public class ExecutionConfig implements Serializable { private boolean useClosureCleaner = true; private int degreeOfParallelism = -1; private int numberOfExecutionRetries = -1; + private boolean forceKryo = false; - // For future use... -// private boolean forceGenericSerializer = false; private boolean objectReuse = false; + + // Serializers and types registered with Kryo and the PojoSerializer + private final Map<Class<?>, Serializer<?>> registeredKryoSerializers = new HashMap<Class<?>, Serializer<?>>(); + private final Map<Class<?>, Class<? extends Serializer<?>>> registeredKryoSerializersClasses = new HashMap<Class<?>, Class<? extends Serializer<?>>>(); + private final Set<Class<?>> registeredKryoTypes = new HashSet<Class<?>>(); + private final Set<Class<?>> registeredPojoTypes = new HashSet<Class<?>>(); + /** * Enables the ClosureCleaner. This analyzes user code functions and sets fields to null * that are not used. This will in most cases make closures or anonymous inner classes @@ -128,21 +140,26 @@ public class ExecutionConfig implements Serializable { return this; } - // These are for future use... -// public ExecutionConfig forceGenericSerializer() { -// forceGenericSerializer = true; -// return this; -// } -// -// public ExecutionConfig disableForceGenericSerializer() { -// forceGenericSerializer = false; -// return this; -// } -// -// public boolean isForceGenericSerializerEnabled() { -// return forceGenericSerializer; -// } -// + /** + * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. + * In some cases this might be preferable. For example, when using interfaces + * with subclasses that cannot be analyzed as POJO. + */ + public void enableForceKryo() { + forceKryo = true; + } + + + /** + * Disable use of Kryo serializer for all POJOs. + */ + public void disableForceKryo() { + forceKryo = false; + } + + public boolean isForceKryoEnabled() { + return forceKryo; + } /** * Enables reusing objects that Flink internally uses for deserialization and passing @@ -169,4 +186,113 @@ public class ExecutionConfig implements Serializable { public boolean isObjectReuseEnabled() { return objectReuse; } + + // -------------------------------------------------------------------------------------------- + // Registry for types and serializers + // -------------------------------------------------------------------------------------------- + + /** + * Registers the given Serializer as a default serializer for the given type at the + * {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}. + * + * Note that the serializer instance must be serializable (as defined by java.io.Serializable), + * because it may be distributed to the worker nodes by java serialization. + * + * @param type The class of the types serialized with the given serializer. + * @param serializer The serializer to use. + */ + public void registerKryoSerializer(Class<?> type, Serializer<?> serializer) { + if (type == null || serializer == null) { + throw new NullPointerException("Cannot register null class or serializer."); + } + if (!(serializer instanceof java.io.Serializable)) { + throw new IllegalArgumentException("The serializer instance must be serializable, (for distributing it in the cluster), " + + "as defined by java.io.Serializable. For stateless serializers, you can use the " + + "'registerSerializer(Class, Class)' method to register the serializer via its class."); + } + + registeredKryoSerializers.put(type, serializer); + } + + /** + * Registers the given Serializer via its class as a serializer for the given type at the + * {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}. + * + * @param type The class of the types serialized with the given serializer. + * @param serializerClass The class of the serializer to use. + */ + public void registerKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { + if (type == null || serializerClass == null) { + throw new NullPointerException("Cannot register null class or serializer."); + } + + registeredKryoSerializersClasses.put(type, serializerClass); + } + + /** + * Registers the given type with the serialization stack. If the type is eventually + * serialized as a POJO, then the type is registered with the POJO serializer. If the + * type ends up being serialized with Kryo, then it will be registered at Kryo to make + * sure that only tags are written. + * + * @param type The class of the type to register. + */ + public void registerPojoType(Class<?> type) { + if (type == null) { + throw new NullPointerException("Cannot register null type class."); + } + registeredPojoTypes.add(type); + } + + /** + * Registers the given type with the serialization stack. If the type is eventually + * serialized as a POJO, then the type is registered with the POJO serializer. If the + * type ends up being serialized with Kryo, then it will be registered at Kryo to make + * sure that only tags are written. + * + * @param type The class of the type to register. + */ + public void registerKryoType(Class<?> type) { + if (type == null) { + throw new NullPointerException("Cannot register null type class."); + } + registeredKryoTypes.add(type); + } + + /** + * Returns the registered Kryo Serializers. + */ + public Map<Class<?>, Serializer<?>> getRegisteredKryoSerializers() { + return registeredKryoSerializers; + } + + /** + * Returns the registered Kryo Serializer classes. + */ + public Map<Class<?>, Class<? extends Serializer<?>>> getRegisteredKryoSerializersClasses() { + return registeredKryoSerializersClasses; + } + + /** + * Returns the registered Kryo types. + */ + public Set<Class<?>> getRegisteredKryoTypes() { + if (isForceKryoEnabled()) { + // if we force kryo, we must also return all the types that + // were previously only registered as POJO + Set<Class<?>> result = new HashSet<Class<?>>(); + result.addAll(registeredKryoTypes); + result.addAll(registeredPojoTypes); + return result; + } else { + return registeredKryoTypes; + } + } + + /** + * Returns the registered POJO types. + */ + public Set<Class<?>> getRegisteredPojoTypes() { + return registeredPojoTypes; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index ab938c0..e9209a8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.HashMap; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.DoubleCounter; import org.apache.flink.api.common.accumulators.Histogram; @@ -60,6 +61,12 @@ public interface RuntimeContext { * @return The number of the parallel subtask. */ int getIndexOfThisSubtask(); + + /** + * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing + * job. + */ + ExecutionConfig getExecutionConfig(); /** * Gets the ClassLoader to load classes that were are not in system's classpath, but are part of the http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 6b755e1..c04548c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.FutureTask; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.accumulators.DoubleCounter; @@ -46,23 +47,31 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { private final ClassLoader userCodeClassLoader; + private final ExecutionConfig executionConfig; + private final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>(); private final DistributedCache distributedCache = new DistributedCache(); - public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) { + public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) { this.name = name; this.numParallelSubtasks = numParallelSubtasks; this.subtaskIndex = subtaskIndex; this.userCodeClassLoader = userCodeClassLoader; + this.executionConfig = executionConfig; } - public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, Map<String, FutureTask<Path>> cpTasks) { - this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader); + public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, FutureTask<Path>> cpTasks) { + this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig); this.distributedCache.setCopyTasks(cpTasks); } - + + @Override + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + @Override public String getTaskName() { return this.name; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java index 74fddef..b9c98cd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.FutureTask; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.core.fs.Path; @@ -37,12 +38,12 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext { private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>(); - public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader); + public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig); } - public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, Map<String, FutureTask<Path>> cpTasks) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, cpTasks); + public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, FutureTask<Path>> cpTasks) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index ea68554..afccd7c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; @@ -69,15 +70,18 @@ public class CollectionExecutor { private final ClassLoader classLoader; private final boolean mutableObjectSafeMode; + + private final ExecutionConfig executionConfig; // -------------------------------------------------------------------------------------------- - public CollectionExecutor() { - this(DEFAULT_MUTABLE_OBJECT_SAFE_MODE); + public CollectionExecutor(ExecutionConfig executionConfig) { + this(executionConfig, DEFAULT_MUTABLE_OBJECT_SAFE_MODE); } - public CollectionExecutor(boolean mutableObjectSafeMode) { + public CollectionExecutor(ExecutionConfig executionConfig, boolean mutableObjectSafeMode) { this.mutableObjectSafeMode = mutableObjectSafeMode; + this.executionConfig = executionConfig; this.intermediateResults = new HashMap<Operator<?>, List<?>>(); this.accumulators = new HashMap<String, Accumulator<?,?>>(); @@ -161,13 +165,13 @@ public class CollectionExecutor { @SuppressWarnings("unchecked") GenericDataSinkBase<IN> typedSink = (GenericDataSinkBase<IN>) sink; - typedSink.executeOnCollections(input); + typedSink.executeOnCollections(input, executionConfig); } private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> source) throws Exception { @SuppressWarnings("unchecked") GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source; - return typedSource.executeOnCollections(mutableObjectSafeMode); + return typedSource.executeOnCollections(executionConfig, mutableObjectSafeMode); } private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> operator, int superStep) throws Exception { @@ -185,8 +189,8 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader()) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader); + ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig) : + new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, executionConfig); for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) { List<?> bcData = execute(bcInputs.getValue()); @@ -196,7 +200,7 @@ public class CollectionExecutor { ctx = null; } - List<OUT> result = typedOp.executeOnCollections(inputData, ctx, mutableObjectSafeMode); + List<OUT> result = typedOp.executeOnCollections(inputData, ctx, executionConfig); if (ctx != null) { AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators()); @@ -227,8 +231,8 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader); + ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig) : + new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, executionConfig); for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) { List<?> bcData = execute(bcInputs.getValue()); @@ -238,7 +242,7 @@ public class CollectionExecutor { ctx = null; } - List<OUT> result = typedOp.executeOnCollections(inputData1, inputData2, ctx, mutableObjectSafeMode); + List<OUT> result = typedOp.executeOnCollections(inputData1, inputData2, ctx, executionConfig); if (ctx != null) { AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators()); @@ -349,7 +353,7 @@ public class CollectionExecutor { int[] keyColumns = iteration.getSolutionSetKeyFields(); boolean[] inputOrderings = new boolean[keyColumns.length]; - TypeComparator<T> inputComparator = ((CompositeType<T>) solutionType).createComparator(keyColumns, inputOrderings, 0); + TypeComparator<T> inputComparator = ((CompositeType<T>) solutionType).createComparator(keyColumns, inputOrderings, 0, executionConfig); Map<TypeComparable<T>, T> solutionMap = new HashMap<TypeComparable<T>, T>(solutionInputData.size()); // fill the solution from the initial input @@ -482,8 +486,8 @@ public class CollectionExecutor { private final int superstep; - public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader) { - super(name, numParallelSubtasks, subtaskIndex, classloader); + public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader, ExecutionConfig executionConfig) { + super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig); this.superstep = superstep; } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java index 9cdea6d..f43f847 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.operators.util.UserCodeWrapper; @@ -286,5 +287,5 @@ public abstract class DualInputOperator<IN1, IN2, OUT, FT extends Function> exte // -------------------------------------------------------------------------------------------- - protected abstract List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception; + protected abstract List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java index 242e83d..2f8a396 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.io.FinalizeOnMaster; import org.apache.flink.api.common.io.InitializeOnMaster; @@ -298,7 +299,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> { // -------------------------------------------------------------------------------------------- - protected void executeOnCollections(List<IN> inputData) throws Exception { + protected void executeOnCollections(List<IN> inputData, ExecutionConfig executionConfig) throws Exception { OutputFormat<IN> format = this.formatWrapper.getUserCodeObject(); TypeInformation<IN> inputType = getInput().getOperatorInfo().getOutputType(); @@ -308,9 +309,9 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> { final TypeComparator<IN> sortComparator; if (inputType instanceof CompositeType) { - sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0); + sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig); } else if (inputType instanceof AtomicType) { - sortComparator = ((AtomicType) inputType).createComparator(sortOrderings[0]); + sortComparator = ((AtomicType) inputType).createComparator(sortOrderings[0], executionConfig); } else { throw new UnsupportedOperationException("Local output sorting does not support type "+inputType+" yet."); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java index ad1b2e4..13c5dad 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java @@ -22,6 +22,7 @@ package org.apache.flink.api.common.operators; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; @@ -176,7 +177,7 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O // -------------------------------------------------------------------------------------------- - protected List<OUT> executeOnCollections(boolean mutableObjectSafe) throws Exception { + protected List<OUT> executeOnCollections(ExecutionConfig executionConfig, boolean mutableObjectSafe) throws Exception { @SuppressWarnings("unchecked") InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject(); inputFormat.configure(this.parameters); @@ -185,7 +186,7 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O // splits InputSplit[] splits = inputFormat.createInputSplits(1); - TypeSerializer<OUT> serializer = getOperatorInfo().getOutputType().createSerializer(); + TypeSerializer<OUT> serializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); for (InputSplit split : splits) { inputFormat.open(split); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java index eddf89b..ada4ab0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.operators.util.UserCodeWrapper; @@ -203,5 +204,5 @@ public abstract class SingleInputOperator<IN, OUT, FT extends Function> extends // -------------------------------------------------------------------------------------------- - protected abstract List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception; + protected abstract List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java index d7d0e20..9586c5d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; @@ -47,7 +48,7 @@ public class Union<T> extends DualInputOperator<T, T, T, AbstractRichFunction> { } @Override - protected List<T> executeOnCollections(List<T> inputData1, List<T> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { + protected List<T> executeOnCollections(List<T> inputData1, List<T> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) { ArrayList<T> result = new ArrayList<T>(inputData1.size() + inputData2.size()); result.addAll(inputData1); result.addAll(inputData2); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java index 31080cd..6304197 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.InvalidProgramException; @@ -298,7 +299,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRich } @Override - protected List<T> executeOnCollections(List<T> inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { + protected List<T> executeOnCollections(List<T> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java index 65b9d1c..4165f3d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.operators.base; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.Partitioner; @@ -186,7 +187,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, // ------------------------------------------------------------------------ @Override - protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> input2, RuntimeContext ctx, boolean mutableObjectSafe) throws Exception { + protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> input2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { // -------------------------------------------------------------------- // Setup // -------------------------------------------------------------------- @@ -196,17 +197,19 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, // for the grouping / merging comparator int[] inputKeys1 = getKeyColumns(0); int[] inputKeys2 = getKeyColumns(1); + + boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled(); boolean[] inputDirections1 = new boolean[inputKeys1.length]; boolean[] inputDirections2 = new boolean[inputKeys2.length]; Arrays.fill(inputDirections1, true); Arrays.fill(inputDirections2, true); - final TypeSerializer<IN1> inputSerializer1 = inputType1.createSerializer(); - final TypeSerializer<IN2> inputSerializer2 = inputType2.createSerializer(); + final TypeSerializer<IN1> inputSerializer1 = inputType1.createSerializer(executionConfig); + final TypeSerializer<IN2> inputSerializer2 = inputType2.createSerializer(executionConfig); - final TypeComparator<IN1> inputComparator1 = getTypeComparator(inputType1, inputKeys1, inputDirections1); - final TypeComparator<IN2> inputComparator2 = getTypeComparator(inputType2, inputKeys2, inputDirections2); + final TypeComparator<IN1> inputComparator1 = getTypeComparator(executionConfig, inputType1, inputKeys1, inputDirections1); + final TypeComparator<IN2> inputComparator2 = getTypeComparator(executionConfig, inputType2, inputKeys2, inputDirections2); final TypeComparator<IN1> inputSortComparator1; final TypeComparator<IN2> inputSortComparator2; @@ -227,7 +230,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, Arrays.fill(allSortDirections, 0, inputKeys1.length, true); System.arraycopy(groupSortDirections, 0, allSortDirections, inputKeys1.length, groupSortDirections.length); - inputSortComparator1 = getTypeComparator(inputType1, allSortKeys, allSortDirections); + inputSortComparator1 = getTypeComparator(executionConfig, inputType1, allSortKeys, allSortDirections); } if (groupOrder2 == null || groupOrder2.getNumberOfFields() == 0) { @@ -246,12 +249,12 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, Arrays.fill(allSortDirections, 0, inputKeys2.length, true); System.arraycopy(groupSortDirections, 0, allSortDirections, inputKeys2.length, groupSortDirections.length); - inputSortComparator2 = getTypeComparator(inputType2, allSortKeys, allSortDirections); + inputSortComparator2 = getTypeComparator(executionConfig, inputType2, allSortKeys, allSortDirections); } CoGroupSortListIterator<IN1, IN2> coGroupIterator = new CoGroupSortListIterator<IN1, IN2>(input1, inputSortComparator1, inputComparator1, inputSerializer1, - input2, inputSortComparator2, inputComparator2, inputSerializer2, mutableObjectSafe); + input2, inputSortComparator2, inputComparator2, inputSerializer2, objectReuseDisabled); // -------------------------------------------------------------------- // Run UDF @@ -262,8 +265,8 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, FunctionUtils.openFunction(function, parameters); List<OUT> result = new ArrayList<OUT>(); - Collector<OUT> resultCollector = mutableObjectSafe ? - new CopyingListCollector<OUT>(result, getOperatorInfo().getOutputType().createSerializer()) : + Collector<OUT> resultCollector = objectReuseDisabled ? + new CopyingListCollector<OUT>(result, getOperatorInfo().getOutputType().createSerializer(executionConfig)) : new ListCollector<OUT>(result); while (coGroupIterator.next()) { @@ -275,12 +278,12 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, return result; } - private <T> TypeComparator<T> getTypeComparator(TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) { + private <T> TypeComparator<T> getTypeComparator(ExecutionConfig executionConfig, TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) { if (!(inputType instanceof CompositeType)) { throw new InvalidProgramException("Input types of coGroup must be composite types."); } - return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0); + return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig); } private static class CoGroupSortListIterator<IN1, IN2> { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java index 8ad91c6..62bdfbe 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.GenericCollectorMap; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.operators.SingleInputOperator; @@ -52,7 +53,7 @@ public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN // -------------------------------------------------------------------------------------------- @Override - protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) { + protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java index c6ceef0..f20659c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; @@ -84,18 +85,20 @@ public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<IN1, IN2, // -------------------------------------------------------------------------------------------- @Override - protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { + protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { CrossFunction<IN1, IN2, OUT> function = this.userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, this.parameters); + + boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled(); ArrayList<OUT> result = new ArrayList<OUT>(inputData1.size() * inputData2.size()); - if (mutableObjectSafeMode) { - TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(); - TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(); - TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(); + if (objectReuseDisabled) { + TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(executionConfig); + TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(executionConfig); + TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); for (IN1 element1 : inputData1) { for (IN2 element2 : inputData2) { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java index f945b1d..2986534 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.aggregators.AggregatorRegistry; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -332,7 +333,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab } @Override - protected List<ST> executeOnCollections(List<ST> inputData1, List<WT> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { + protected List<ST> executeOnCollections(List<ST> inputData1, List<WT> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java index f4bd537..4db5265 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; @@ -50,7 +51,7 @@ public class FilterOperatorBase<T, FT extends FlatMapFunction<T, T>> extends Sin } @Override - protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { + protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { FlatMapFunction<T, T> function = this.userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java index 8312a99..615ba87 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.operators.base; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.CopyingListCollector; @@ -53,17 +54,19 @@ public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> e // ------------------------------------------------------------------------ @Override - protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { + protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, parameters); ArrayList<OUT> result = new ArrayList<OUT>(input.size()); + + boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled(); - if (mutableObjectSafeMode) { - TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(); - TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(); + if (objectReuseDisabled) { + TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); + TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java index ddfd874..f4f7d0f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.commons.lang3.ArrayUtils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; @@ -152,9 +153,11 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN, // -------------------------------------------------------------------------------------------- @Override - protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { + protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { GroupReduceFunction<IN, OUT> function = this.userFunction.getUserCodeObject(); + boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled(); + UnaryOperatorInformation<IN, OUT> operatorInfo = getOperatorInfo(); TypeInformation<IN> inputType = operatorInfo.getInputType(); @@ -176,7 +179,7 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN, if(sortColumns.length == 0) { // => all reduce. No comparator Preconditions.checkArgument(sortOrderings.length == 0); } else { - final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0); + final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig); Collections.sort(inputData, new Comparator<IN>() { @Override @@ -193,9 +196,9 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN, ArrayList<OUT> result = new ArrayList<OUT>(); if (keyColumns.length == 0) { - if (mutableObjectSafeMode) { - final TypeSerializer<IN> inputSerializer = inputType.createSerializer(); - TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(); + if (objectReuseDisabled) { + final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig); + TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); List<IN> inputDataCopy = new ArrayList<IN>(inputData.size()); for (IN in: inputData) { inputDataCopy.add(inputSerializer.copy(in)); @@ -208,14 +211,14 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN, function.reduce(inputData, collector); } } else { - final TypeSerializer<IN> inputSerializer = inputType.createSerializer(); + final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig); boolean[] keyOrderings = new boolean[keyColumns.length]; - final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0); + final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, executionConfig); - ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator, mutableObjectSafeMode); + ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator, objectReuseDisabled); - if (mutableObjectSafeMode) { - TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(); + if (objectReuseDisabled) { + TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer); while (keyedIterator.nextKey()) { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java index 555175d..373846f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.operators.base; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.RuntimeContext; @@ -139,7 +140,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN @SuppressWarnings("unchecked") @Override - protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafe) throws Exception { + protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception { FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, runtimeContext); @@ -148,22 +149,24 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType(); TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType(); TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType(); + + boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled(); - TypeSerializer<IN1> leftSerializer = mutableObjectSafe ? leftInformation.createSerializer() : null; - TypeSerializer<IN2> rightSerializer = mutableObjectSafe ? rightInformation.createSerializer() : null; + TypeSerializer<IN1> leftSerializer = objectReuseDisabled ? leftInformation.createSerializer(executionConfig) : null; + TypeSerializer<IN2> rightSerializer = objectReuseDisabled ? rightInformation.createSerializer(executionConfig) : null; TypeComparator<IN1> leftComparator; TypeComparator<IN2> rightComparator; if (leftInformation instanceof AtomicType) { - leftComparator = ((AtomicType<IN1>) leftInformation).createComparator(true); + leftComparator = ((AtomicType<IN1>) leftInformation).createComparator(true, executionConfig); } else if (leftInformation instanceof CompositeType) { int[] keyPositions = getKeyColumns(0); boolean[] orders = new boolean[keyPositions.length]; Arrays.fill(orders, true); - leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders, 0); + leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders, 0, executionConfig); } else { throw new RuntimeException("Type information for left input of type " + leftInformation.getClass() @@ -171,14 +174,14 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN } if (rightInformation instanceof AtomicType) { - rightComparator = ((AtomicType<IN2>) rightInformation).createComparator(true); + rightComparator = ((AtomicType<IN2>) rightInformation).createComparator(true, executionConfig); } else if (rightInformation instanceof CompositeType) { int[] keyPositions = getKeyColumns(1); boolean[] orders = new boolean[keyPositions.length]; Arrays.fill(orders, true); - rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders, 0); + rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders, 0, executionConfig); } else { throw new RuntimeException("Type information for right input of type " + rightInformation.getClass() @@ -188,7 +191,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN TypePairComparator<IN1, IN2> pairComparator = new GenericPairComparator<IN1, IN2>(leftComparator, rightComparator); List<OUT> result = new ArrayList<OUT>(); - Collector<OUT> collector = mutableObjectSafe ? new CopyingListCollector<OUT>(result, outInformation.createSerializer()) + Collector<OUT> collector = objectReuseDisabled ? new CopyingListCollector<OUT>(result, outInformation.createSerializer(executionConfig)) : new ListCollector<OUT>(result); Map<Integer, List<IN2>> probeTable = new HashMap<Integer, List<IN2>>(); @@ -212,7 +215,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN pairComparator.setReference(left); for (IN2 right : matchingHashes) { if (pairComparator.equalToReference(right)) { - if (mutableObjectSafe) { + if (objectReuseDisabled) { function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector); } else { function.join(left, right, collector); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java index 0218bfa..cde3b74 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; @@ -54,17 +55,19 @@ public class MapOperatorBase<IN, OUT, FT extends MapFunction<IN, OUT>> extends S // -------------------------------------------------------------------------------------------- @Override - protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { + protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { MapFunction<IN, OUT> function = this.userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, this.parameters); ArrayList<OUT> result = new ArrayList<OUT>(inputData.size()); + + boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled(); - if (mutableObjectSafeMode) { - TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(); - TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(); + if (objectReuseDisabled) { + TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); + TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); for (IN element : inputData) { IN inCopy = inSerializer.copy(element); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java index 7c1fcef..25b3bb8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.CopyingIterator; @@ -57,17 +58,19 @@ public class MapPartitionOperatorBase<IN, OUT, FT extends MapPartitionFunction<I // -------------------------------------------------------------------------------------------- @Override - protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { + protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { MapPartitionFunction<IN, OUT> function = this.userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, this.parameters); ArrayList<OUT> result = new ArrayList<OUT>(inputData.size() / 4); + + boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled(); - if (mutableObjectSafeMode) { - TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(); - TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(); + if (objectReuseDisabled) { + TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); + TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); CopyingIterator<IN> source = new CopyingIterator<IN>(inputData.iterator(), inSerializer); CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java index 3602a82..f91d7d8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.NoOpFunction; @@ -88,7 +89,7 @@ public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpF // -------------------------------------------------------------------------------------------- @Override - protected List<IN> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { + protected List<IN> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) { return inputData; } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java index f1bf0e9..d3d61e9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.operators.base; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.ReduceFunction; @@ -149,12 +150,13 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI // -------------------------------------------------------------------------------------------- @Override - protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { + protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { // make sure we can handle empty inputs if (inputData.isEmpty()) { return Collections.emptyList(); } - + + boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled(); ReduceFunction<T> function = this.userFunction.getUserCodeObject(); UnaryOperatorInformation<T, T> operatorInfo = getOperatorInfo(); @@ -169,11 +171,11 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, this.parameters); - TypeSerializer<T> serializer = getOperatorInfo().getInputType().createSerializer(); + TypeSerializer<T> serializer = getOperatorInfo().getInputType().createSerializer(executionConfig); if (inputColumns.length > 0) { boolean[] inputOrderings = new boolean[inputColumns.length]; - TypeComparator<T> inputComparator = ((CompositeType<T>) inputType).createComparator(inputColumns, inputOrderings, 0); + TypeComparator<T> inputComparator = ((CompositeType<T>) inputType).createComparator(inputColumns, inputOrderings, 0, executionConfig); Map<TypeComparable<T>, T> aggregateMap = new HashMap<TypeComparable<T>, T>(inputData.size() / 10); @@ -183,7 +185,7 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI T existing = aggregateMap.get(wrapper); T result; - if (mutableObjectSafeMode) { + if (objectReuseDisabled) { if (existing != null) { result = function.reduce(existing, serializer.copy(next)); } else { @@ -209,7 +211,7 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI else { T aggregate = inputData.get(0); - if (mutableObjectSafeMode) { + if (objectReuseDisabled) { aggregate = serializer.copy(aggregate); for (int i = 1; i < inputData.size(); i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java index 10dbbfe..2fc8a1b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.typeinfo; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -26,5 +27,5 @@ import org.apache.flink.api.common.typeutils.TypeComparator; */ public interface AtomicType<T> { - TypeComparator<T> createComparator(boolean sortOrderAscending); + TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java index 646a549..80f5f63 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.typeinfo; import java.util.HashMap; import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer; import org.apache.flink.api.common.functions.InvalidTypesException; @@ -94,12 +95,12 @@ public class BasicArrayTypeInfo<T, C> extends TypeInformation<T> { @Override @SuppressWarnings("unchecked") - public TypeSerializer<T> createSerializer() { + public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { // special case the string array if (componentClass.equals(String.class)) { return (TypeSerializer<T>) StringArraySerializer.INSTANCE; } else { - return (TypeSerializer<T>) new GenericArraySerializer<C>(this.componentClass, this.componentInfo.createSerializer()); + return (TypeSerializer<T>) new GenericArraySerializer<C>(this.componentClass, this.componentInfo.createSerializer(executionConfig)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java index 61d830a..7bf7298 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java @@ -23,6 +23,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -114,12 +115,12 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T } @Override - public TypeSerializer<T> createSerializer() { + public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { return this.serializer; } @Override - public TypeComparator<T> createComparator(boolean sortOrderAscending) { + public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { if (comparatorClass != null) { return instantiateComparator(comparatorClass, sortOrderAscending); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java index dba0e6f..367670c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.typeinfo; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.types.Nothing; @@ -54,7 +55,7 @@ public class NothingTypeInfo extends TypeInformation<Nothing> { } @Override - public TypeSerializer<Nothing> createSerializer() { + public TypeSerializer<Nothing> createSerializer(ExecutionConfig executionConfig) { throw new RuntimeException("The Nothing type cannot have a serializer."); } }
