[hotfix] [core] Fix/cleanup serialization test for ExecutionConfig

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afd36f98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afd36f98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afd36f98

Branch: refs/heads/master
Commit: afd36f9814ee282df8e3a58e846911f6efa54c61
Parents: d498cbe
Author: Stephan Ewen <[email protected]>
Authored: Wed Mar 15 15:25:20 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfigTest.java   | 76 ++++++++++++++++++-
 .../graph/StreamingJobGraphGeneratorTest.java   | 79 +-------------------
 2 files changed, 74 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/afd36f98/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java 
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index d000ff9..7e98604 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -22,12 +22,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -39,17 +45,17 @@ public class ExecutionConfigTest {
                List<Class<?>> types = Arrays.<Class<?>>asList(Double.class, 
Integer.class, Double.class);
                List<Class<?>> expectedTypes = 
Arrays.<Class<?>>asList(Double.class, Integer.class);
 
-               for(Class<?> tpe: types) {
+               for (Class<?> tpe: types) {
                        config.registerKryoType(tpe);
                }
 
                int counter = 0;
 
-               for(Class<?> tpe: config.getRegisteredKryoTypes()){
+               for (Class<?> tpe: config.getRegisteredKryoTypes()){
                        assertEquals(tpe, expectedTypes.get(counter++));
                }
 
-               assertTrue(counter == expectedTypes.size());
+               assertEquals(expectedTypes.size(), counter);
        }
 
        @Test
@@ -88,4 +94,68 @@ public class ExecutionConfigTest {
                        // expected
                }
        }
+
+       @Test
+       public void testExecutionConfigSerialization() throws IOException, 
ClassNotFoundException {
+               final Random r = new Random();
+
+               final int parallelism = 1 + r.nextInt(10);
+               final boolean closureCleanerEnabled = r.nextBoolean(), 
+                               forceAvroEnabled = r.nextBoolean(),
+                               forceKryoEnabled = r.nextBoolean(),
+                               disableGenericTypes = r.nextBoolean(),
+                               objectReuseEnabled = r.nextBoolean(),
+                               sysoutLoggingEnabled = r.nextBoolean();
+
+               final ExecutionConfig config = new ExecutionConfig();
+
+               if (closureCleanerEnabled) {
+                       config.enableClosureCleaner();
+               } else {
+                       config.disableClosureCleaner();
+               }
+               if (forceAvroEnabled) {
+                       config.enableForceAvro();
+               } else {
+                       config.disableForceAvro();
+               }
+               if (forceKryoEnabled) {
+                       config.enableForceKryo();
+               } else {
+                       config.disableForceKryo();
+               }
+               if (disableGenericTypes) {
+                       config.disableGenericTypes();
+               } else {
+                       config.enableGenericTypes();
+               }
+               if (objectReuseEnabled) {
+                       config.enableObjectReuse();
+               } else {
+                       config.disableObjectReuse();
+               }
+               if (sysoutLoggingEnabled) {
+                       config.enableSysoutLogging();
+               } else {
+                       config.disableSysoutLogging();
+               }
+               config.setParallelism(parallelism);
+
+               final ExecutionConfig copy1 = 
CommonTestUtils.createCopySerializable(config);
+               final ExecutionConfig copy2 = new 
SerializedValue<>(config).deserializeValue(getClass().getClassLoader());
+
+               assertNotNull(copy1);
+               assertNotNull(copy2);
+
+               assertEquals(config, copy1);
+               assertEquals(config, copy2);
+
+               assertEquals(closureCleanerEnabled, 
copy1.isClosureCleanerEnabled());
+               assertEquals(forceAvroEnabled, copy1.isForceAvroEnabled());
+               assertEquals(forceKryoEnabled, copy1.isForceKryoEnabled());
+               assertEquals(disableGenericTypes, 
copy1.hasGenericTypesDisabled());
+               assertEquals(objectReuseEnabled, copy1.isObjectReuseEnabled());
+               assertEquals(sysoutLoggingEnabled, 
copy1.isSysoutLoggingEnabled());
+               assertEquals(parallelism, copy1.getParallelism());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/afd36f98/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 968b1c9..5f1973c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -27,96 +26,20 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
-       
-       @Test
-       public void testExecutionConfigSerialization() throws IOException, 
ClassNotFoundException {
-               final long seed = System.currentTimeMillis();
-               final Random r = new Random(seed);
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               StreamGraph streamingJob = new StreamGraph(env);
-               StreamingJobGraphGenerator compiler = new 
StreamingJobGraphGenerator(streamingJob);
-               
-               boolean closureCleanerEnabled = r.nextBoolean(), 
forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), 
disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), 
sysoutLoggingEnabled = r.nextBoolean();
-               int dop = 1 + r.nextInt(10);
-               
-               ExecutionConfig config = streamingJob.getExecutionConfig();
-               if(closureCleanerEnabled) {
-                       config.enableClosureCleaner();
-               } else {
-                       config.disableClosureCleaner();
-               }
-               if(forceAvroEnabled) {
-                       config.enableForceAvro();
-               } else {
-                       config.disableForceAvro();
-               }
-               if(forceKryoEnabled) {
-                       config.enableForceKryo();
-               } else {
-                       config.disableForceKryo();
-               }
-               if(disableGenericTypes) {
-                       config.disableGenericTypes();
-               } else {
-                       config.enableGenericTypes();
-               }
-               if(objectReuseEnabled) {
-                       config.enableObjectReuse();
-               } else {
-                       config.disableObjectReuse();
-               }
-               if(sysoutLoggingEnabled) {
-                       config.enableSysoutLogging();
-               } else {
-                       config.disableSysoutLogging();
-               }
-               config.setParallelism(dop);
-               
-               JobGraph jobGraph = compiler.createJobGraph();
-
-               final String EXEC_CONFIG_KEY = "runtime.config";
-
-               
InstantiationUtil.writeObjectToConfig(jobGraph.getSerializedExecutionConfig(),
-                       jobGraph.getJobConfiguration(),
-                       EXEC_CONFIG_KEY);
-
-               SerializedValue<ExecutionConfig> serializedExecutionConfig = 
InstantiationUtil.readObjectFromConfig(
-                               jobGraph.getJobConfiguration(),
-                               EXEC_CONFIG_KEY,
-                               Thread.currentThread().getContextClassLoader());
 
-               assertNotNull(serializedExecutionConfig);
-
-               ExecutionConfig executionConfig = 
serializedExecutionConfig.deserializeValue(getClass().getClassLoader());
-
-               assertEquals(closureCleanerEnabled, 
executionConfig.isClosureCleanerEnabled());
-               assertEquals(forceAvroEnabled, 
executionConfig.isForceAvroEnabled());
-               assertEquals(forceKryoEnabled, 
executionConfig.isForceKryoEnabled());
-               assertEquals(disableGenericTypes, 
executionConfig.hasGenericTypesDisabled());
-               assertEquals(objectReuseEnabled, 
executionConfig.isObjectReuseEnabled());
-               assertEquals(sysoutLoggingEnabled, 
executionConfig.isSysoutLoggingEnabled());
-               assertEquals(dop, executionConfig.getParallelism());
-       }
-       
        @Test
        public void testParallelismOneNotChained() {
 

Reply via email to