[FLINK-5692] [core] Add an Option to Deactivate Kryo Fallback for Serializers
This closes #3373 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f99aae1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f99aae1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f99aae1 Branch: refs/heads/master Commit: 0f99aae1e1f8b693c2ba79a061046bc042113f0b Parents: 677b508 Author: Jin Mingjian <[email protected]> Authored: Tue Feb 21 11:57:21 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Thu Mar 16 14:43:26 2017 +0100 ---------------------------------------------------------------------- docs/dev/types_serialization.md | 7 +++++ .../flink/api/common/ExecutionConfig.java | 29 ++++++++++++++++++++ .../api/java/typeutils/GenericTypeInfo.java | 6 ++++ .../flink/api/common/ExecutionConfigTest.java | 25 +++++++++++++++++ .../graph/StreamingJobGraphGeneratorTest.java | 8 +++++- 5 files changed, 74 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/docs/dev/types_serialization.md ---------------------------------------------------------------------- diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md index e723c33..20ee071 100644 --- a/docs/dev/types_serialization.md +++ b/docs/dev/types_serialization.md @@ -306,6 +306,13 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializ There are different variants of these methods available. +If you do not want to fall back to Kryo and further make sure that you have provided your own custom serializers for all POJOs explicitly, set +{% highlight java %} +env.getConfig().disableGenericTypes(); +{% endhighlight %} + +If generic types disabled, an {@link UnsupportedOperationException} will be thrown when Flink tries to fall back to the default Kryo serializer logic in the runtime. + ## Defining Type Information using a Factory A type information factory allows for plugging-in user-defined type information into the Flink type system. http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 32ea0a3..3bd91c7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -109,6 +109,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut private boolean forceKryo = false; + private boolean disableGenericTypes = false; + private boolean objectReuse = false; private boolean autoTypeRegistrationEnabled = true; @@ -519,6 +521,31 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut } /** + * Enable generic types. + * + * @see ExecutionConfig#disableGenericTypes() + */ + public void enableGenericTypes() { + disableGenericTypes = false; + } + + /** + * Disable generic types to make sure that you have provided your own custom serializers for + * all POJOs explicitly. + * + * If generic types disabled, + * an {@link UnsupportedOperationException} will be thrown when Flink + * tries to fall back to the default Kryo serializer logic in the runtime. + */ + public void disableGenericTypes() { + disableGenericTypes = true; + } + + public boolean hasGenericTypesDisabled() { + return disableGenericTypes; + } + + /** * Force Flink to use the AvroSerializer for POJOs. */ public void enableForceAvro() { @@ -804,6 +831,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut ((restartStrategyConfiguration == null && other.restartStrategyConfiguration == null) || (null != restartStrategyConfiguration && restartStrategyConfiguration.equals(other.restartStrategyConfiguration))) && forceKryo == other.forceKryo && + disableGenericTypes == other.disableGenericTypes && objectReuse == other.objectReuse && autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled && forceAvro == other.forceAvro && @@ -830,6 +858,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut parallelism, restartStrategyConfiguration, forceKryo, + disableGenericTypes, objectReuse, autoTypeRegistrationEnabled, forceAvro, http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java index bc4e87a..a4cea31 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java @@ -81,6 +81,12 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType @Override @PublicEvolving public TypeSerializer<T> createSerializer(ExecutionConfig config) { + if (config.hasGenericTypesDisabled()) { + throw new UnsupportedOperationException( + "Generic types are disabled for POJOs serialization, but type " + this.typeClass + + " is treated as a generic type."); + } + return new KryoSerializer<T>(this.typeClass, config); } http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java index 883ee6c..4956a9a 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java @@ -18,6 +18,10 @@ package org.apache.flink.api.common; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.junit.Test; import java.util.Arrays; @@ -25,6 +29,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class ExecutionConfigTest { @@ -64,4 +69,24 @@ public class ExecutionConfigTest { assertEquals(parallelism, config.getParallelism()); } + @Test + public void testForceCustomSerializerCheck() { + ExecutionConfig conf = new ExecutionConfig(); + TypeInformation<Object> typeInfo = new GenericTypeInfo<Object>(Object.class); + TypeSerializer<Object> serializer = typeInfo.createSerializer(conf); + assertTrue(serializer instanceof KryoSerializer); + + conf.disableGenericTypes(); + boolean createSerializerFailed = false; + try { + typeInfo.createSerializer(conf); + } catch (UnsupportedOperationException e) { + createSerializerFailed = true; + } catch (Throwable t) { + fail("Unexpected exception thrown: " + t.getMessage()); + } + + assertTrue(createSerializerFailed); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 7c51bc2..968b1c9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -55,7 +55,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { StreamGraph streamingJob = new StreamGraph(env); StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob); - boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean(); + boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean(); int dop = 1 + r.nextInt(10); ExecutionConfig config = streamingJob.getExecutionConfig(); @@ -74,6 +74,11 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { } else { config.disableForceKryo(); } + if(disableGenericTypes) { + config.disableGenericTypes(); + } else { + config.enableGenericTypes(); + } if(objectReuseEnabled) { config.enableObjectReuse(); } else { @@ -106,6 +111,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled()); assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled()); assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled()); + assertEquals(disableGenericTypes, executionConfig.hasGenericTypesDisabled()); assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled()); assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled()); assertEquals(dop, executionConfig.getParallelism());
