[FLINK-5692] [core] Add an Option to Deactivate Kryo Fallback for Serializers

This closes #3373


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f99aae1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f99aae1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f99aae1

Branch: refs/heads/master
Commit: 0f99aae1e1f8b693c2ba79a061046bc042113f0b
Parents: 677b508
Author: Jin Mingjian <[email protected]>
Authored: Tue Feb 21 11:57:21 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 docs/dev/types_serialization.md                 |  7 +++++
 .../flink/api/common/ExecutionConfig.java       | 29 ++++++++++++++++++++
 .../api/java/typeutils/GenericTypeInfo.java     |  6 ++++
 .../flink/api/common/ExecutionConfigTest.java   | 25 +++++++++++++++++
 .../graph/StreamingJobGraphGeneratorTest.java   |  8 +++++-
 5 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index e723c33..20ee071 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -306,6 +306,13 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, 
Class<? extends Serializ
 
 There are different variants of these methods available.
 
+If you do not want to fall back to Kryo and further make sure that you have 
provided your own custom serializers for all POJOs explicitly, set
+{% highlight java %}
+env.getConfig().disableGenericTypes();
+{% endhighlight %}
+
+If generic types disabled, an {@link UnsupportedOperationException} will be 
thrown when Flink tries to fall back to the default Kryo serializer logic in 
the runtime.
+
 ## Defining Type Information using a Factory
 
 A type information factory allows for plugging-in user-defined type 
information into the Flink type system.

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 32ea0a3..3bd91c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -109,6 +109,8 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
 
        private boolean forceKryo = false;
 
+       private boolean disableGenericTypes = false;
+
        private boolean objectReuse = false;
 
        private boolean autoTypeRegistrationEnabled = true;
@@ -519,6 +521,31 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        }
 
        /**
+        * Enable generic types.
+        *
+        * @see ExecutionConfig#disableGenericTypes()
+        */
+       public void enableGenericTypes() {
+               disableGenericTypes = false;
+       }
+
+       /**
+        * Disable generic types to make sure that you have provided your own 
custom serializers for
+        * all POJOs explicitly.
+        *
+        * If generic types disabled,
+        * an {@link UnsupportedOperationException} will be thrown when Flink
+        * tries to fall back to the default Kryo serializer logic in the 
runtime.
+        */
+       public void disableGenericTypes() {
+               disableGenericTypes = true;
+       }
+
+       public boolean hasGenericTypesDisabled() {
+               return disableGenericTypes;
+       }
+
+       /**
         * Force Flink to use the AvroSerializer for POJOs.
         */
        public void enableForceAvro() {
@@ -804,6 +831,7 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                                ((restartStrategyConfiguration == null && 
other.restartStrategyConfiguration == null) ||
                                        (null != restartStrategyConfiguration 
&& restartStrategyConfiguration.equals(other.restartStrategyConfiguration))) &&
                                forceKryo == other.forceKryo &&
+                               disableGenericTypes == 
other.disableGenericTypes &&
                                objectReuse == other.objectReuse &&
                                autoTypeRegistrationEnabled == 
other.autoTypeRegistrationEnabled &&
                                forceAvro == other.forceAvro &&
@@ -830,6 +858,7 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                        parallelism,
                        restartStrategyConfiguration,
                        forceKryo,
+                       disableGenericTypes,
                        objectReuse,
                        autoTypeRegistrationEnabled,
                        forceAvro,

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index bc4e87a..a4cea31 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -81,6 +81,12 @@ public class GenericTypeInfo<T> extends TypeInformation<T> 
implements AtomicType
        @Override
        @PublicEvolving
        public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+               if (config.hasGenericTypesDisabled()) {
+                       throw new UnsupportedOperationException(
+                               "Generic types are disabled for POJOs 
serialization, but type " + this.typeClass +
+                               " is treated as a generic type.");
+               }
+
                return new KryoSerializer<T>(this.typeClass, config);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java 
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 883ee6c..4956a9a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.api.common;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -25,6 +29,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class ExecutionConfigTest {
 
@@ -64,4 +69,24 @@ public class ExecutionConfigTest {
                assertEquals(parallelism, config.getParallelism());
        }
 
+       @Test
+       public void testForceCustomSerializerCheck() {
+               ExecutionConfig conf = new ExecutionConfig();
+               TypeInformation<Object> typeInfo = new 
GenericTypeInfo<Object>(Object.class);
+               TypeSerializer<Object> serializer = 
typeInfo.createSerializer(conf);
+               assertTrue(serializer instanceof KryoSerializer);
+
+               conf.disableGenericTypes();
+               boolean createSerializerFailed = false;
+               try {
+                       typeInfo.createSerializer(conf);
+               } catch (UnsupportedOperationException e) {
+                       createSerializerFailed = true;
+               } catch (Throwable t) {
+                       fail("Unexpected exception thrown: " + t.getMessage());
+               }
+
+               assertTrue(createSerializerFailed);
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 7c51bc2..968b1c9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -55,7 +55,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                StreamGraph streamingJob = new StreamGraph(env);
                StreamingJobGraphGenerator compiler = new 
StreamingJobGraphGenerator(streamingJob);
                
-               boolean closureCleanerEnabled = r.nextBoolean(), 
forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), 
objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
+               boolean closureCleanerEnabled = r.nextBoolean(), 
forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), 
disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), 
sysoutLoggingEnabled = r.nextBoolean();
                int dop = 1 + r.nextInt(10);
                
                ExecutionConfig config = streamingJob.getExecutionConfig();
@@ -74,6 +74,11 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                } else {
                        config.disableForceKryo();
                }
+               if(disableGenericTypes) {
+                       config.disableGenericTypes();
+               } else {
+                       config.enableGenericTypes();
+               }
                if(objectReuseEnabled) {
                        config.enableObjectReuse();
                } else {
@@ -106,6 +111,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                assertEquals(closureCleanerEnabled, 
executionConfig.isClosureCleanerEnabled());
                assertEquals(forceAvroEnabled, 
executionConfig.isForceAvroEnabled());
                assertEquals(forceKryoEnabled, 
executionConfig.isForceKryoEnabled());
+               assertEquals(disableGenericTypes, 
executionConfig.hasGenericTypesDisabled());
                assertEquals(objectReuseEnabled, 
executionConfig.isObjectReuseEnabled());
                assertEquals(sysoutLoggingEnabled, 
executionConfig.isSysoutLoggingEnabled());
                assertEquals(dop, executionConfig.getParallelism());

Reply via email to