[FLINK-3701] enable reuse of ExecutionConfig

Depending on the context, the ExecutionConfig's type fields may either
be deserialized using a custom class loader or the default class
loader. It may be explicitly serialized for the Task or shipped inside
the PojoSerializer where it is serialized or directly passed in local
mode. An ExecutionConfig may be reused and thus its fields can't be set
to null after it has been shipped once.

The entire ExecutionConfig is now serialized upon setting it on the
JobGraph. It is not passed through the JobGraph's constructor but set
explicitly on the JobGraph. If no ExecutionConfig has been set, the
default is used. Unlike before, no code may modify the ExecutionConfig
after it has been set on the JobGraph.

This closes #1913


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48b469ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48b469ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48b469ad

Branch: refs/heads/master
Commit: 48b469ad4f0da466b347071cea82913965645de3
Parents: 099fdfa
Author: Maximilian Michels <[email protected]>
Authored: Mon May 2 11:41:05 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Fri May 13 18:01:38 2016 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |  98 +----------
 .../flink/api/common/ExecutionConfigTest.java   |  27 +++
 .../plantranslate/JobGraphGenerator.java        |  14 +-
 .../webmonitor/handlers/JobConfigHandler.java   |   8 +-
 .../BackPressureStatsTrackerITCase.java         |   2 +-
 .../StackTraceSampleCoordinatorITCase.java      |   2 +-
 .../deployment/TaskDeploymentDescriptor.java    |  14 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  16 +-
 .../runtime/executiongraph/ExecutionVertex.java |   4 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  65 +++----
 .../apache/flink/runtime/taskmanager/Task.java  |  30 ++--
 .../flink/runtime/jobmanager/JobManager.scala   |  23 +--
 .../checkpoint/CoordinatorShutdownTest.java     |   4 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   4 +-
 .../client/JobClientActorRecoveryITCase.java    |   2 +-
 .../runtime/client/JobClientActorTest.java      |   2 +-
 .../TaskDeploymentDescriptorTest.java           |   8 +-
 .../ExecutionGraphConstructionTest.java         |  20 +--
 .../ExecutionGraphDeploymentTest.java           |  10 +-
 .../ExecutionGraphRestartTest.java              |  49 +++---
 .../ExecutionGraphSignalsTest.java              |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   7 +-
 .../ExecutionStateProgressTest.java             | 176 +++++++++----------
 .../executiongraph/LocalInputSplitsTest.java    |  10 +-
 .../executiongraph/PointwisePatternTest.java    |  28 +--
 .../TerminalStateDeadlockTest.java              |   4 +-
 .../VertexLocationConstraintTest.java           |  26 +--
 .../executiongraph/VertexSlotSharingTest.java   |   4 +-
 .../PartialConsumePipelinedResultTest.java      |   3 +-
 .../flink/runtime/jobgraph/JobGraphTest.java    |  13 +-
 .../jobgraph/jsonplan/JsonGeneratorTest.java    |   2 +-
 .../runtime/jobmanager/JobManagerTest.java      |   6 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   4 +-
 .../SlotCountExceedingParallelismTest.java      |   2 +-
 .../StandaloneSubmittedJobGraphStoreTest.java   |   2 +-
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java |   2 +-
 .../ScheduleOrUpdateConsumersTest.java          |   1 -
 .../LeaderChangeJobRecoveryTest.java            |   5 +-
 .../LeaderChangeStateCleanupTest.java           |   2 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   5 +-
 .../TaskCancelAsyncProducerConsumerITCase.java  |   2 +-
 .../runtime/taskmanager/TaskCancelTest.java     |   2 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  32 ++--
 .../flink/runtime/taskmanager/TaskStopTest.java |   3 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   5 +-
 .../TaskManagerLossFailsTasksTest.scala         |   8 +-
 .../jobmanager/CoLocationConstraintITCase.scala |   2 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  46 +++--
 .../runtime/jobmanager/RecoveryITCase.scala     |   9 +-
 .../runtime/jobmanager/SlotSharingITCase.scala  |   5 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |   4 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  14 +-
 .../streaming/api/RestartStrategyTest.java      |  10 +-
 .../graph/StreamingJobGraphGeneratorTest.java   |  14 +-
 .../partitioner/RescalePartitionerTest.java     |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   5 +-
 .../JobSubmissionFailsITCase.java               |   7 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |   3 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   2 +-
 .../runtime/NetworkStackThroughputITCase.java   |   2 +-
 .../ZooKeeperLeaderElectionITCase.java          |   3 +-
 .../flink/test/web/WebFrontendITCase.java       |   5 +-
 .../jobmanager/JobManagerFailsITCase.scala      |   6 +-
 .../JobManagerLeaderSessionIDITSuite.scala      |   2 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  20 +--
 65 files changed, 437 insertions(+), 486 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index c27ee74..d27760f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,11 +22,10 @@ import com.esotericsoftware.kryo.Serializer;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.util.SerializedValue;
 
 
-import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -128,7 +127,7 @@ public class ExecutionConfig implements Serializable {
 
        // ------------------------------- User code values 
--------------------------------------------
 
-       private transient GlobalJobParameters globalJobParameters;
+       private GlobalJobParameters globalJobParameters;
 
        // Serializers and types registered with Kryo and the PojoSerializer
        // we store them in linked maps/sets to ensure they are registered in 
order in all kryo instances.
@@ -145,22 +144,6 @@ public class ExecutionConfig implements Serializable {
 
        private LinkedHashSet<Class<?>> registeredPojoTypes = new 
LinkedHashSet<>();
 
-       // ----------------------- Helper values for serialized user objects 
---------------------------
-
-       private SerializedValue<GlobalJobParameters> 
serializedGlobalJobParameters;
-
-       private SerializedValue<LinkedHashMap<Class<?>, 
SerializableSerializer<?>>> serializedRegisteredTypesWithKryoSerializers;
-
-       private SerializedValue<LinkedHashMap<Class<?>, Class<? extends 
Serializer<?>>>> serializedRegisteredTypesWithKryoSerializerClasses;
-
-       private SerializedValue<LinkedHashMap<Class<?>, 
SerializableSerializer<?>>> serializedDefaultKryoSerializers;
-
-       private SerializedValue<LinkedHashMap<Class<?>, Class<? extends 
Serializer<?>>>> serializedDefaultKryoSerializerClasses;
-
-       private SerializedValue<LinkedHashSet<Class<?>>> 
serializedRegisteredKryoTypes;
-
-       private SerializedValue<LinkedHashSet<Class<?>>> 
serializedRegisteredPojoTypes;
-
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -695,79 +678,6 @@ public class ExecutionConfig implements Serializable {
                this.autoTypeRegistrationEnabled = false;
        }
 
-       /**
-        * Deserializes user code objects given a user code class loader
-        *
-        * @param userCodeClassLoader User code class loader
-        * @throws IOException Thrown if an IOException occurs while loading 
the classes
-        * @throws ClassNotFoundException Thrown if the given class cannot be 
loaded
-        */
-       public void deserializeUserCode(ClassLoader userCodeClassLoader) throws 
IOException, ClassNotFoundException {
-               if (serializedRegisteredKryoTypes != null) {
-                       registeredKryoTypes = 
serializedRegisteredKryoTypes.deserializeValue(userCodeClassLoader);
-               } else {
-                       registeredKryoTypes = new LinkedHashSet<>();
-               }
-
-               if (serializedRegisteredPojoTypes != null) {
-                       registeredPojoTypes = 
serializedRegisteredPojoTypes.deserializeValue(userCodeClassLoader);
-               } else {
-                       registeredPojoTypes = new LinkedHashSet<>();
-               }
-
-               if (serializedRegisteredTypesWithKryoSerializerClasses != null) 
{
-                       registeredTypesWithKryoSerializerClasses = 
serializedRegisteredTypesWithKryoSerializerClasses.deserializeValue(userCodeClassLoader);
-               } else {
-                       registeredTypesWithKryoSerializerClasses = new 
LinkedHashMap<>();
-               }
-
-               if (serializedRegisteredTypesWithKryoSerializers != null) {
-                       registeredTypesWithKryoSerializers = 
serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader);
-               } else {
-                       registeredTypesWithKryoSerializerClasses = new 
LinkedHashMap<>();
-               }
-
-               if (serializedDefaultKryoSerializers != null) {
-                       defaultKryoSerializers = 
serializedDefaultKryoSerializers.deserializeValue(userCodeClassLoader);
-               } else {
-                       defaultKryoSerializers = new LinkedHashMap<>();
-
-               }
-
-               if (serializedDefaultKryoSerializerClasses != null) {
-                       defaultKryoSerializerClasses = 
serializedDefaultKryoSerializerClasses.deserializeValue(userCodeClassLoader);
-               } else {
-                       defaultKryoSerializerClasses = new LinkedHashMap<>();
-               }
-
-               if (serializedGlobalJobParameters != null) {
-                       globalJobParameters = 
serializedGlobalJobParameters.deserializeValue(userCodeClassLoader);
-               }
-       }
-
-       public void serializeUserCode() throws IOException {
-               serializedRegisteredKryoTypes = new 
SerializedValue<>(registeredKryoTypes);
-               registeredKryoTypes = null;
-
-               serializedRegisteredPojoTypes = new 
SerializedValue<>(registeredPojoTypes);
-               registeredPojoTypes = null;
-
-               serializedRegisteredTypesWithKryoSerializerClasses = new 
SerializedValue<>(registeredTypesWithKryoSerializerClasses);
-               registeredTypesWithKryoSerializerClasses = null;
-
-               serializedRegisteredTypesWithKryoSerializers = new 
SerializedValue<>(registeredTypesWithKryoSerializers);
-               registeredTypesWithKryoSerializers = null;
-
-               serializedDefaultKryoSerializers = new 
SerializedValue<>(defaultKryoSerializers);
-               defaultKryoSerializers = null;
-
-               serializedDefaultKryoSerializerClasses = new 
SerializedValue<>(defaultKryoSerializerClasses);
-               defaultKryoSerializerClasses = null;
-
-               serializedGlobalJobParameters = new 
SerializedValue<>(globalJobParameters);
-               globalJobParameters = null;
-       }
-
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof ExecutionConfig) {
@@ -854,10 +764,10 @@ public class ExecutionConfig implements Serializable {
                 * Convert UserConfig into a {@code Map<String, String>} 
representation.
                 * This can be used by the runtime, for example for presenting 
the user config in the web frontend.
                 *
-                * @return Key/Value representation of the UserConfig, or null.
+                * @return Key/Value representation of the UserConfig
                 */
                public Map<String, String> toMap() {
-                       return null;
+                       return Collections.emptyMap();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java 
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 158d971..103e06f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.api.common;
 
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
@@ -74,4 +76,29 @@ public class ExecutionConfigTest {
 
                assertEquals(parallelism, config.getParallelism());
        }
+
+       /**
+        * Helper function to create a new ExecutionConfig for tests.
+        * @return A serialized ExecutionConfig
+        */
+       public static SerializedValue<ExecutionConfig> getSerializedConfig() {
+               try {
+                       return new SerializedValue<>(new ExecutionConfig());
+               } catch (IOException e) {
+                       throw new RuntimeException("Couldn't create new 
ExecutionConfig for test.", e);
+               }
+       }
+
+       /**
+        * Deserializes the given ExecutionConfig with the System class loader.
+        * @param serializedConfig The serialized ExecutionConfig
+        * @return ExecutionConfig
+        */
+       public static ExecutionConfig 
deserializeConfig(SerializedValue<ExecutionConfig> serializedConfig) {
+               try {
+                       return 
serializedConfig.deserializeValue(ExecutionConfigTest.class.getClassLoader());
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not deserialize 
ExecutionConfig for test.", e);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 696a05d..a5ae00c 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -83,7 +83,6 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.Visitor;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -215,7 +214,8 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                // ----------- finalize the job graph -----------
 
                // create the job graph object
-               JobGraph graph = new JobGraph(jobId, program.getJobName(), 
program.getOriginalPlan().getExecutionConfig());
+               JobGraph graph = new JobGraph(jobId, program.getJobName());
+               
graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
 
                graph.setAllowQueuedScheduling(false);
                
graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
@@ -243,18 +243,10 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                this.iterations = null;
                this.iterationStack = null;
 
-               try {
-                       // make sure that we can send the ExecutionConfig using 
the system class loader
-                       graph.getExecutionConfig().serializeUserCode();
-               } catch (IOException e) {
-                       throw new CompilerException("Could not serialize the 
user code object in the " +
-                               "ExecutionConfig.", e);
-               }
-               
                // return job graph
                return graph;
        }
-       
+
        /**
         * This methods implements the pre-visiting during a depth-first 
traversal. It create the job vertex and
         * sets local strategy.

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 0f2f514..cd63630 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -45,7 +45,13 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
                gen.writeStringField("jid", graph.getJobID().toString());
                gen.writeStringField("name", graph.getJobName());
 
-               ExecutionConfig ec = graph.getExecutionConfig();
+               ExecutionConfig ec;
+               try {
+                       ec = 
graph.getSerializedExecutionConfig().deserializeValue(graph.getUserClassLoader());
+               } catch (Exception e) {
+                       throw new RuntimeException("Couldn't deserialize 
ExecutionConfig.", e);
+               }
+
                if (ec != null) {
                        gen.writeObjectFieldStart("execution-config");
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 1f0b2ef..25dc189 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -93,7 +93,7 @@ public class BackPressureStatsTrackerITCase extends 
TestLogger {
                        final FiniteDuration deadline = new FiniteDuration(60, 
TimeUnit.SECONDS);
 
                        // The JobGraph
-                       final JobGraph jobGraph = new JobGraph(new 
ExecutionConfig());
+                       final JobGraph jobGraph = new JobGraph();
                        final int parallelism = 4;
 
                        final JobVertex task = new JobVertex("Task");

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index c6ce315..9b1f608 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -77,7 +77,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                        final FiniteDuration deadline = new FiniteDuration(60, 
TimeUnit.SECONDS);
 
                        // The JobGraph
-                       final JobGraph jobGraph = new JobGraph(new 
ExecutionConfig());
+                       final JobGraph jobGraph = new JobGraph();
                        final int parallelism = 1;
 
                        final JobVertex task = new JobVertex("Task");

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 948f6af..2b1c224 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -90,7 +90,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
        private final SerializedValue<StateHandle<?>> operatorState;
 
        /** The execution configuration (see {@link ExecutionConfig}) related 
to the specific job. */
-       private final ExecutionConfig executionConfig;
+       private final SerializedValue<ExecutionConfig> 
serializedExecutionConfig;
 
        private long recoveryTimestamp;
                
@@ -101,7 +101,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                        JobID jobID,
                        JobVertexID vertexID,
                        ExecutionAttemptID executionId,
-                       ExecutionConfig executionConfig,
+                       SerializedValue<ExecutionConfig> 
serializedExecutionConfig,
                        String taskName,
                        int indexInSubtaskGroup,
                        int numberOfSubtasks,
@@ -125,7 +125,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                this.jobID = checkNotNull(jobID);
                this.vertexID = checkNotNull(vertexID);
                this.executionId = checkNotNull(executionId);
-               this.executionConfig = checkNotNull(executionConfig);
+               this.serializedExecutionConfig = 
checkNotNull(serializedExecutionConfig);
                this.taskName = checkNotNull(taskName);
                this.indexInSubtaskGroup = indexInSubtaskGroup;
                this.numberOfSubtasks = numberOfSubtasks;
@@ -146,7 +146,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                JobID jobID,
                JobVertexID vertexID,
                ExecutionAttemptID executionId,
-               ExecutionConfig executionConfig,
+               SerializedValue<ExecutionConfig> serializedExecutionConfig,
                String taskName,
                int indexInSubtaskGroup,
                int numberOfSubtasks,
@@ -164,7 +164,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                        jobID,
                        vertexID,
                        executionId,
-                       executionConfig,
+                       serializedExecutionConfig,
                        taskName,
                        indexInSubtaskGroup,
                        numberOfSubtasks,
@@ -185,8 +185,8 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
         * Returns the execution configuration (see {@link ExecutionConfig}) 
related to the
         * specific job.
         */
-       public ExecutionConfig getExecutionConfig() {
-               return executionConfig;
+       public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
+               return serializedExecutionConfig;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3796402..5dae785 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -175,7 +175,7 @@ public class ExecutionGraph implements Serializable {
        // ------ Configuration of the Execution -------
 
        /** The execution configuration (see {@link ExecutionConfig}) related 
to this specific job. */
-       private ExecutionConfig executionConfig;
+       private SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
        /** Flag to indicate whether the scheduler may queue tasks for 
execution, or needs to be able
         * to deploy them immediately. */
@@ -245,7 +245,7 @@ public class ExecutionGraph implements Serializable {
                        JobID jobId,
                        String jobName,
                        Configuration jobConfig,
-                       ExecutionConfig config,
+                       SerializedValue<ExecutionConfig> serializedConfig,
                        FiniteDuration timeout,
                        RestartStrategy restartStrategy) {
                this(
@@ -253,7 +253,7 @@ public class ExecutionGraph implements Serializable {
                        jobId,
                        jobName,
                        jobConfig,
-                       config,
+                       serializedConfig,
                        timeout,
                        restartStrategy,
                        new ArrayList<BlobKey>(),
@@ -267,7 +267,7 @@ public class ExecutionGraph implements Serializable {
                        JobID jobId,
                        String jobName,
                        Configuration jobConfig,
-                       ExecutionConfig config,
+                       SerializedValue<ExecutionConfig> serializedConfig,
                        FiniteDuration timeout,
                        RestartStrategy restartStrategy,
                        List<BlobKey> requiredJarFiles,
@@ -301,7 +301,7 @@ public class ExecutionGraph implements Serializable {
                this.requiredJarFiles = requiredJarFiles;
                this.requiredClasspaths = requiredClasspaths;
 
-               this.executionConfig = checkNotNull(config);
+               this.serializedExecutionConfig = checkNotNull(serializedConfig);
 
                this.timeout = timeout;
 
@@ -962,12 +962,12 @@ public class ExecutionGraph implements Serializable {
        }
 
        /**
-        * Returns the {@link ExecutionConfig}.
+        * Returns the serialized {@link ExecutionConfig}.
         *
         * @return ExecutionConfig
         */
-       public ExecutionConfig getExecutionConfig() {
-               return executionConfig;
+       public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
+               return serializedExecutionConfig;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 4d27423..cbc47a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -667,7 +667,7 @@ public class ExecutionVertex implements Serializable {
                        consumedPartitions.add(new 
InputGateDeploymentDescriptor(resultId, queueToRequest, partitions));
                }
 
-               ExecutionConfig config = 
getExecutionGraph().getExecutionConfig();
+               SerializedValue<ExecutionConfig> serializedConfig = 
getExecutionGraph().getSerializedExecutionConfig();
                List<BlobKey> jarFiles = 
getExecutionGraph().getRequiredJarFiles();
                List<URL> classpaths = 
getExecutionGraph().getRequiredClasspaths();
 
@@ -675,7 +675,7 @@ public class ExecutionVertex implements Serializable {
                        getJobId(),
                        getJobvertexId(),
                        executionId,
-                       config,
+                       serializedConfig,
                        getTaskName(),
                        subTaskIndex,
                        getTotalNumberOfParallelSubtasks(),

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index b7c6551..b3e3739 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -28,6 +28,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -96,79 +98,66 @@ public class JobGraph implements Serializable {
        private List<URL> classpaths = Collections.emptyList();
 
        /** Job specific execution config */
-       private ExecutionConfig executionConfig;
+       private SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Constructs a new job graph with no name, a random job ID, and the 
given
-        * {@link ExecutionConfig}.
-        *
-        * @param config The {@link ExecutionConfig} for the job.
-        */
-       public JobGraph(ExecutionConfig config) {
-               this(null, config);
-       }
-
-       /**
         * Constructs a new job graph with the given name, the given {@link 
ExecutionConfig},
-        * and a random job ID.
+        * and a random job ID. The ExecutionConfig will be serialized and 
can't be modified afterwards.
         *
         * @param jobName The name of the job.
-        * @param config The execution configuration of the job.
         */
-       public JobGraph(String jobName, ExecutionConfig config) {
-               this(null, jobName, config);
+       public JobGraph(String jobName) {
+               this(null, jobName);
        }
 
        /**
         * Constructs a new job graph with the given job ID (or a random ID, if 
{@code null} is passed),
         * the given name and the given execution configuration (see {@link 
ExecutionConfig}).
+        * The ExecutionConfig will be serialized and can't be modified 
afterwards.
         *
         * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
         * @param jobName The name of the job.
-        * @param config The execution configuration of the job.
         */
-       public JobGraph(JobID jobId, String jobName, ExecutionConfig config) {
+       public JobGraph(JobID jobId, String jobName) {
                this.jobID = jobId == null ? new JobID() : jobId;
                this.jobName = jobName == null ? "(unnamed job)" : jobName;
-               this.executionConfig = config == null ? new ExecutionConfig() : 
config;
+               setExecutionConfig(new ExecutionConfig());
        }
 
        /**
         * Constructs a new job graph with no name, a random job ID, the given 
{@link ExecutionConfig}, and
-        * the given job vertices.
+        * the given job vertices. The ExecutionConfig will be serialized and 
can't be modified afterwards.
         *
-        * @param config The execution configuration of the job.
         * @param vertices The vertices to add to the graph.
         */
-       public JobGraph(ExecutionConfig config, JobVertex... vertices) {
-               this(null, config, vertices);
+       public JobGraph(JobVertex... vertices) {
+               this(null, vertices);
        }
 
        /**
         * Constructs a new job graph with the given name, the given {@link 
ExecutionConfig}, a random job ID,
-        * and the given job vertices.
+        * and the given job vertices. The ExecutionConfig will be serialized 
and can't be modified afterwards.
         *
         * @param jobName The name of the job.
-        * @param config The execution configuration of the job.
         * @param vertices The vertices to add to the graph.
         */
-       public JobGraph(String jobName, ExecutionConfig config, JobVertex... 
vertices) {
-               this(null, jobName, config, vertices);
+       public JobGraph(String jobName, JobVertex... vertices) {
+               this(null, jobName, vertices);
        }
 
        /**
         * Constructs a new job graph with the given name, the given {@link 
ExecutionConfig},
         * the given jobId or a random one if null supplied, and the given job 
vertices.
+        * The ExecutionConfig will be serialized and can't be modified 
afterwards.
         *
         * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
         * @param jobName The name of the job.
-        * @param config The execution configuration of the job.
         * @param vertices The vertices to add to the graph.
         */
-       public JobGraph(JobID jobId, String jobName, ExecutionConfig config, 
JobVertex... vertices) {
-               this(jobId, jobName, config);
+       public JobGraph(JobID jobId, String jobName, JobVertex... vertices) {
+               this(jobId, jobName);
 
                for (JobVertex vertex : vertices) {
                        addVertex(vertex);
@@ -210,8 +199,8 @@ public class JobGraph implements Serializable {
         *
         * @return ExecutionConfig
         */
-       public ExecutionConfig getExecutionConfig() {
-               return executionConfig;
+       public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
+               return serializedExecutionConfig;
        }
 
        /**
@@ -249,6 +238,20 @@ public class JobGraph implements Serializable {
        }
 
        /**
+        * Sets a serialized copy of the passed ExecutionConfig. Further 
modification of the referenced ExecutionConfig
+        * object will not affect this serialized copy.
+        * @param executionConfig The ExecutionConfig to be serialized.
+        */
+       public void setExecutionConfig(ExecutionConfig executionConfig) {
+               Preconditions.checkNotNull(executionConfig, "ExecutionConfig 
must not be null.");
+               try {
+                       this.serializedExecutionConfig = new 
SerializedValue<>(executionConfig);
+               } catch (IOException e) {
+                       throw new RuntimeException("Could not serialize 
ExecutionConfig.", e);
+               }
+       }
+
+       /**
         * Adds a new task vertex to the job graph if it is not already 
included.
         *
         * @param vertex

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1ae0053..251673f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -219,11 +219,11 @@ public class Task implements Runnable {
 
        private volatile long recoveryTs;
 
-       /** The job specific execution configuration (see {@link 
ExecutionConfig}). */
-       private final ExecutionConfig executionConfig;
+       /** Serialized version of the job specific execution configuration (see 
{@link ExecutionConfig}). */
+       private final SerializedValue<ExecutionConfig> 
serializedExecutionConfig;
 
-       /** Interval between two successive task cancellation attempts */
-       private final long taskCancellationInterval;
+       /** Initialized from the Flink configuration. May also be set at the 
ExecutionConfig */
+       private long taskCancellationInterval;
 
        /**
         * <p><b>IMPORTANT:</b> This constructor may not start any work that 
would need to
@@ -253,7 +253,11 @@ public class Task implements Runnable {
                this.nameOfInvokableClass = 
checkNotNull(tdd.getInvokableClassName());
                this.operatorState = tdd.getOperatorState();
                this.recoveryTs = tdd.getRecoveryTimestamp();
-               this.executionConfig = checkNotNull(tdd.getExecutionConfig());
+               this.serializedExecutionConfig = 
checkNotNull(tdd.getSerializedExecutionConfig());
+
+               this.taskCancellationInterval = jobConfiguration.getLong(
+                       ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
+                       
ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
 
                this.memoryManager = checkNotNull(memManager);
                this.ioManager = checkNotNull(ioManager);
@@ -271,15 +275,6 @@ public class Task implements Runnable {
 
                this.executionListenerActors = new 
CopyOnWriteArrayList<ActorGateway>();
 
-               if (executionConfig.getTaskCancellationInterval() < 0) {
-                       taskCancellationInterval = jobConfiguration.getLong(
-                               
ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
-                               
ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
-               } else {
-                       taskCancellationInterval = 
executionConfig.getTaskCancellationInterval();
-               }
-
-
                // create the reader and writer structures
 
                final String taskNameWithSubtaskAndId = taskNameWithSubtask + " 
(" + executionId + ')';
@@ -467,9 +462,14 @@ public class Task implements Runnable {
                        // first of all, get a user-code classloader
                        // this may involve downloading the job's JAR files 
and/or classes
                        LOG.info("Loading JAR files for task " + 
taskNameWithSubtask);
+
                        final ClassLoader userCodeClassLoader = 
createUserCodeClassloader(libraryCache);
+                       final ExecutionConfig executionConfig = 
serializedExecutionConfig.deserializeValue(userCodeClassLoader);
 
-                       
executionConfig.deserializeUserCode(userCodeClassLoader);
+                       if (executionConfig.getTaskCancellationInterval() >= 0) 
{
+                               // override task cancellation interval from 
Flink config if set in ExecutionConfig
+                               taskCancellationInterval = 
executionConfig.getTaskCancellationInterval();
+                       }
 
                        // now load the task's invokable code
                        invokable = 
loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d8b8a01..3c633f3 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -28,7 +28,6 @@ import akka.actor._
 import akka.pattern.ask
 
 import grizzled.slf4j.Logger
-import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
@@ -46,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.messages._
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, 
RestartStrategyFactory}
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionJobVertex}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
@@ -1069,11 +1068,14 @@ class JobManager(
           throw new JobSubmissionException(jobId, "The given job is empty")
         }
 
-        val restartStrategy = 
Option(jobGraph.getExecutionConfig().getRestartStrategy())
-          .map(RestartStrategyFactory.createRestartStrategy(_)) match {
-            case Some(strategy) => strategy
-            case None => restartStrategyFactory.createRestartStrategy()
-          }
+        val restartStrategy =
+          Option(jobGraph.getSerializedExecutionConfig()
+            .deserializeValue(userCodeLoader)
+            .getRestartStrategy())
+              .map(RestartStrategyFactory.createRestartStrategy(_)) match {
+                case Some(strategy) => strategy
+                case None => restartStrategyFactory.createRestartStrategy()
+              }
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
 
@@ -1088,7 +1090,7 @@ class JobManager(
               jobGraph.getJobID,
               jobGraph.getName,
               jobGraph.getJobConfiguration,
-              jobGraph.getExecutionConfig,
+              jobGraph.getSerializedExecutionConfig,
               timeout,
               restartStrategy,
               jobGraph.getUserJarBlobKeys,
@@ -1197,12 +1199,13 @@ class JobManager(
               new SimpleCheckpointStatsTracker(historySize, ackVertices)
             }
 
-          val jobParallelism = jobGraph.getExecutionConfig.getParallelism()
+          val jobParallelism = jobGraph.getSerializedExecutionConfig
+            .deserializeValue(userCodeLoader).getParallelism()
 
           val parallelism = if (jobParallelism == 
ExecutionConfig.PARALLELISM_AUTO_MAX) {
             numSlots
           } else {
-            jobGraph.getExecutionConfig.getParallelism
+            jobParallelism
           }
 
           executionGraph.enableSnapshotCheckpointing(

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 03ff83d..91a83b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -61,7 +61,7 @@ public class CoordinatorShutdownTest {
                        vertex.setInvokableClass(Tasks.NoOpInvokable.class);
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
                        
-                       JobGraph testGraph = new JobGraph("test job", new 
ExecutionConfig(), vertex);
+                       JobGraph testGraph = new JobGraph("test job", vertex);
                        testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 
                                        5000, 60000, 0L, Integer.MAX_VALUE));
                        
@@ -113,7 +113,7 @@ public class CoordinatorShutdownTest {
                        vertex.setInvokableClass(Tasks.NoOpInvokable.class);
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
 
-                       JobGraph testGraph = new JobGraph("test job", new 
ExecutionConfig(), vertex);
+                       JobGraph testGraph = new JobGraph("test job", vertex);
                        testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
                                        5000, 60000, 0L, Integer.MAX_VALUE));
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 965556f..a801348 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import akka.actor.ActorSystem;
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -50,7 +50,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                        new JobID(),
                        "test",
                        new Configuration(),
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        new FiniteDuration(1, TimeUnit.DAYS),
                        new NoRestartStrategy(),
                        Collections.<BlobKey>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
index 865760e..cc1994a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
@@ -95,7 +95,7 @@ public class JobClientActorRecoveryITCase extends TestLogger {
                JobVertex blockingVertex = new JobVertex("Blocking Vertex");
                blockingVertex.setInvokableClass(BlockingTask.class);
                blockingVertex.setParallelism(1);
-               final JobGraph jobGraph = new JobGraph("Blocking Test Job", new 
ExecutionConfig(), blockingVertex);
+               final JobGraph jobGraph = new JobGraph("Blocking Test Job", 
blockingVertex);
                final Promise<JobExecutionResult> promise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
 
                Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index ee1fd60..073164c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit;
 public class JobClientActorTest extends TestLogger {
 
        private static ActorSystem system;
-       private static JobGraph testJobGraph = new JobGraph("Test Job", new 
ExecutionConfig());
+       private static JobGraph testJobGraph = new JobGraph("Test Job");
 
        @BeforeClass
        public static void setup() {

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 63e62bf..36744a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 public class TaskDeploymentDescriptorTest {
@@ -55,7 +57,7 @@ public class TaskDeploymentDescriptorTest {
                        final List<InputGateDeploymentDescriptor> inputGates = 
new ArrayList<InputGateDeploymentDescriptor>(0);
                        final List<BlobKey> requiredJars = new 
ArrayList<BlobKey>(0);
                        final List<URL> requiredClasspaths = new 
ArrayList<URL>(0);
-                       final ExecutionConfig executionConfig = new 
ExecutionConfig();
+                       final SerializedValue<ExecutionConfig> executionConfig 
= ExecutionConfigTest.getSerializedConfig();
 
                        final TaskDeploymentDescriptor orig = new 
TaskDeploymentDescriptor(jobID, vertexID, execId,
                                executionConfig, taskName, indexInSubtaskGroup, 
currentNumberOfSubtasks, attemptNumber,
@@ -78,9 +80,7 @@ public class TaskDeploymentDescriptorTest {
                        assertEquals(orig.getAttemptNumber(), 
copy.getAttemptNumber());
                        assertEquals(orig.getProducedPartitions(), 
copy.getProducedPartitions());
                        assertEquals(orig.getInputGates(), 
copy.getInputGates());
-                       // load serialized values in ExecutionConfig
-                       
copy.getExecutionConfig().deserializeUserCode(getClass().getClassLoader());
-                       assertEquals(orig.getExecutionConfig(), 
copy.getExecutionConfig());
+                       assertEquals(orig.getSerializedExecutionConfig(), 
copy.getSerializedExecutionConfig());
 
                        assertEquals(orig.getRequiredJarFiles(), 
copy.getRequiredJarFiles());
                        assertEquals(orig.getRequiredClasspaths(), 
copy.getRequiredClasspaths());

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index d845d01..8eebe66 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -107,7 +107,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg,
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -152,7 +152,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg, 
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -220,7 +220,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg, 
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -475,7 +475,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg, 
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -532,7 +532,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg, 
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -594,7 +594,7 @@ public class ExecutionGraphConstructionTest {
                                jobId, 
                                jobName, 
                                cfg, 
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
                        try {
@@ -640,7 +640,7 @@ public class ExecutionGraphConstructionTest {
                                jobId, 
                                jobName,
                                cfg, 
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
 
@@ -705,14 +705,14 @@ public class ExecutionGraphConstructionTest {
                        JobVertex v8 = new JobVertex("vertex8");
                        v8.setParallelism(2);
 
-                       JobGraph jg = new JobGraph(jobId, jobName, new 
ExecutionConfig(), v1, v2, v3, v4, v5, v6, v7, v8);
+                       JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, 
v4, v5, v6, v7, v8);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                TestingUtils.defaultExecutionContext(), 
                                jobId, 
                                jobName, 
                                cfg, 
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 7a9cee7..d126acb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -30,7 +30,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -84,8 +84,8 @@ public class ExecutionGraphDeploymentTest {
                                TestingUtils.defaultExecutionContext(), 
                                jobId, 
                                "some job", 
-                               new Configuration(), 
-                               new ExecutionConfig(),
+                               new Configuration(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
 
@@ -289,7 +289,7 @@ public class ExecutionGraphDeploymentTest {
                        jobId, 
                        "some job", 
                        new Configuration(),
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                
@@ -332,4 +332,4 @@ public class ExecutionGraphDeploymentTest {
                        throw new Exception();
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 0837927..01cca5c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
@@ -73,14 +74,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
                sender.setInvokableClass(Tasks.NoOpInvokable.class);
                sender.setParallelism(NUM_TASKS);
 
-               JobGraph jobGraph = new JobGraph("Pointwise job", new 
ExecutionConfig(), sender);
+               JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
                ExecutionGraph eg = new ExecutionGraph(
                                TestingUtils.defaultExecutionContext(),
                                new JobID(),
                                "test job",
                                new Configuration(),
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -129,13 +130,13 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                groupVertex.setStrictlyCoLocatedWith(groupVertex2);
                
                //initiate and schedule job
-               JobGraph jobGraph = new JobGraph("Pointwise job", new 
ExecutionConfig(), groupVertex, groupVertex2);
+               JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, 
groupVertex2);
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
                        new JobID(),
                        "test job",
                        new Configuration(),
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new FixedDelayRestartStrategy(1, 0L));
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -184,14 +185,14 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                sender.setInvokableClass(Tasks.NoOpInvokable.class);
                sender.setParallelism(NUM_TASKS);
 
-               JobGraph jobGraph = new JobGraph("Pointwise job", new 
ExecutionConfig(), sender);
+               JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
                ExecutionGraph eg = new ExecutionGraph(
                                TestingUtils.defaultExecutionContext(),
                                new JobID(),
                                "Test job",
                                new Configuration(),
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new FixedDelayRestartStrategy(1, 1000));
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -220,7 +221,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "TestJob",
                                new Configuration(),
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                // We want to manually control the restart and 
delay
                                new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
@@ -229,7 +230,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
                jobVertex.setParallelism(NUM_TASKS);
 
-               JobGraph jobGraph = new JobGraph("TestJob", new 
ExecutionConfig(), jobVertex);
+               JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
 
                
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
@@ -279,7 +280,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "TestJob",
                                new Configuration(),
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                // We want to manually control the restart and 
delay
                                new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
@@ -295,7 +296,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
                jobVertex.setParallelism(NUM_TASKS);
 
-               JobGraph jobGraph = new JobGraph("TestJob", new 
ExecutionConfig(), jobVertex);
+               JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
 
                
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
@@ -355,14 +356,14 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                sender.setInvokableClass(Tasks.NoOpInvokable.class);
                sender.setParallelism(NUM_TASKS);
 
-               JobGraph jobGraph = new JobGraph("Pointwise job", new 
ExecutionConfig(), sender);
+               JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
                ExecutionGraph eg = spy(new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
                        new JobID(),
                        "Test job",
                        new Configuration(),
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new FixedDelayRestartStrategy(1, 1000)));
 
@@ -426,14 +427,14 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                receiver.setInvokableClass(Tasks.NoOpInvokable.class);
                receiver.setParallelism(1);
 
-               JobGraph jobGraph = new JobGraph("Pointwise job", new 
ExecutionConfig(), sender, receiver);
+               JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
 
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
                        new JobID(),
                        "test job",
                        new Configuration(),
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new FixedDelayRestartStrategy(1, 1000));
 
@@ -518,16 +519,18 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                vertex.setInvokableClass(Tasks.NoOpInvokable.class);
                vertex.setParallelism(1);
 
-               JobGraph jobGraph = new JobGraph("Test Job", new 
ExecutionConfig(), vertex);
-               
jobGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(
-                               Integer.MAX_VALUE, Integer.MAX_VALUE));
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+                       Integer.MAX_VALUE, Integer.MAX_VALUE));
+               JobGraph jobGraph = new JobGraph("Test Job", vertex);
+               jobGraph.setExecutionConfig(executionConfig);
 
                ExecutionGraph eg = new ExecutionGraph(
                                TestingUtils.defaultExecutionContext(),
                                new JobID(),
                                "test job",
                                new Configuration(),
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new FixedDelayRestartStrategy(1, 1000000));
 
@@ -570,16 +573,18 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                vertex.setInvokableClass(Tasks.NoOpInvokable.class);
                vertex.setParallelism(1);
 
-               JobGraph jobGraph = new JobGraph("Test Job", new 
ExecutionConfig(), vertex);
-               
jobGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(
-                               Integer.MAX_VALUE, Integer.MAX_VALUE));
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+                       Integer.MAX_VALUE, Integer.MAX_VALUE));
+               JobGraph jobGraph = new JobGraph("Test Job", vertex);
+               jobGraph.setExecutionConfig(executionConfig);
 
                ExecutionGraph eg = new ExecutionGraph(
                                TestingUtils.defaultExecutionContext(),
                                new JobID(),
                                "test job",
                                new Configuration(),
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new FixedDelayRestartStrategy(1, 1000000));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index d1bb680..8b04fa3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StoppingException;
@@ -128,7 +128,7 @@ public class ExecutionGraphSignalsTest {
                        jobId,
                        jobName,
                        cfg,
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                eg.attachJobGraph(ordered);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 6659b5a..92a7402 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -22,10 +22,11 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -176,7 +177,7 @@ public class ExecutionGraphTestUtils {
                        new JobID(), 
                        "test job", 
                        new Configuration(), 
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
 
@@ -197,7 +198,7 @@ public class ExecutionGraphTestUtils {
                return ejv;
        }
        
-       public static ExecutionJobVertex getExecutionVertex(JobVertexID id) 
throws JobException {
+       public static ExecutionJobVertex getExecutionVertex(JobVertexID id) 
throws JobException, IOException {
                return getExecutionVertex(id, 
TestingUtils.defaultExecutionContext());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 1ff90e1..9e4aa6d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -1,88 +1,88 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.Test;
-
-public class ExecutionStateProgressTest {
-
-       @Test
-       public void testAccumulatedStateFinished() {
-               try {
-                       final JobID jid = new JobID();
-                       final JobVertexID vid = new JobVertexID();
-
-                       JobVertex ajv = new JobVertex("TestVertex", vid);
-                       ajv.setParallelism(3);
-                       
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
-                       ExecutionGraph graph = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(), 
-                               jid, 
-                               "test job", 
-                               new Configuration(), 
-                new ExecutionConfig(),
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
-                       graph.attachJobGraph(Arrays.asList(ajv));
-
-                       setGraphStatus(graph, JobStatus.RUNNING);
-
-                       ExecutionJobVertex ejv = graph.getJobVertex(vid);
-
-                       // mock resources and mock taskmanager
-                       for (ExecutionVertex ee : ejv.getTaskVertices()) {
-                               SimpleSlot slot = getInstance(
-                                               new SimpleActorGateway(
-                                                               
TestingUtils.defaultExecutionContext())
-                               ).allocateSimpleSlot(jid);
-                               ee.deployToSlot(slot);
-                       }
-
-                       // finish all
-                       for (ExecutionVertex ee : ejv.getTaskVertices()) {
-                               ee.executionFinished();
-                       }
-
-                       assertTrue(ejv.isInFinalState());
-                       assertEquals(JobStatus.FINISHED, graph.getState());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Test;
+
+public class ExecutionStateProgressTest {
+
+       @Test
+       public void testAccumulatedStateFinished() {
+               try {
+                       final JobID jid = new JobID();
+                       final JobVertexID vid = new JobVertexID();
+
+                       JobVertex ajv = new JobVertex("TestVertex", vid);
+                       ajv.setParallelism(3);
+                       
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
+
+                       ExecutionGraph graph = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(), 
+                               jid, 
+                               "test job", 
+                               new Configuration(),
+                               ExecutionConfigTest.getSerializedConfig(),
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
+                       graph.attachJobGraph(Arrays.asList(ajv));
+
+                       setGraphStatus(graph, JobStatus.RUNNING);
+
+                       ExecutionJobVertex ejv = graph.getJobVertex(vid);
+
+                       // mock resources and mock taskmanager
+                       for (ExecutionVertex ee : ejv.getTaskVertices()) {
+                               SimpleSlot slot = getInstance(
+                                               new SimpleActorGateway(
+                                                               
TestingUtils.defaultExecutionContext())
+                               ).allocateSimpleSlot(jid);
+                               ee.deployToSlot(slot);
+                       }
+
+                       // finish all
+                       for (ExecutionVertex ee : ejv.getTaskVertices()) {
+                               ee.executionFinished();
+                       }
+
+                       assertTrue(ejv.isInFinalState());
+                       assertEquals(JobStatus.FINISHED, graph.getState());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index a4c86e3..d7ce0ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.when;
 import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.io.StrictlyLocalAssignment;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -266,14 +266,14 @@ public class LocalInputSplitsTest {
                        vertex.setInvokableClass(DummyInvokable.class);
                        vertex.setInputSplitSource(new 
TestInputSplitSource(splits));
                        
-                       JobGraph jobGraph = new JobGraph("test job", new 
ExecutionConfig(), vertex);
+                       JobGraph jobGraph = new JobGraph("test job", vertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                TestingUtils.defaultExecutionContext(), 
                                jobGraph.getJobID(),
                                jobGraph.getName(),  
                                jobGraph.getJobConfiguration(),
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                TIMEOUT,
                                new NoRestartStrategy());
                        
@@ -331,14 +331,14 @@ public class LocalInputSplitsTest {
                vertex.setInvokableClass(DummyInvokable.class);
                vertex.setInputSplitSource(new TestInputSplitSource(splits));
                
-               JobGraph jobGraph = new JobGraph("test job", new 
ExecutionConfig(), vertex);
+               JobGraph jobGraph = new JobGraph("test job", vertex);
                
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
                        jobGraph.getJobID(),
                        jobGraph.getName(),  
                        jobGraph.getJobConfiguration(),
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        TIMEOUT,
                        new NoRestartStrategy());
                

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index cbeeded..1b369db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -64,7 +64,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg,
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -105,8 +105,8 @@ public class PointwisePatternTest {
                        TestingUtils.defaultExecutionContext(), 
                        jobId, 
                        jobName, 
-                       cfg, 
-                       new ExecutionConfig(),
+                       cfg,
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -148,8 +148,8 @@ public class PointwisePatternTest {
                        TestingUtils.defaultExecutionContext(), 
                        jobId, 
                        jobName, 
-                       cfg, 
-                       new ExecutionConfig(),
+                       cfg,
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -192,8 +192,8 @@ public class PointwisePatternTest {
                        TestingUtils.defaultExecutionContext(), 
                        jobId, 
                        jobName,
-                       cfg, 
-                       new ExecutionConfig(),
+                       cfg,
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -234,8 +234,8 @@ public class PointwisePatternTest {
                        TestingUtils.defaultExecutionContext(), 
                        jobId, 
                        jobName, 
-                       cfg, 
-                       new ExecutionConfig(),
+                       cfg,
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -296,8 +296,8 @@ public class PointwisePatternTest {
                        TestingUtils.defaultExecutionContext(), 
                        jobId, 
                        jobName, 
-                       cfg, 
-                       new ExecutionConfig(),
+                       cfg,
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -349,8 +349,8 @@ public class PointwisePatternTest {
                        TestingUtils.defaultExecutionContext(), 
                        jobId, 
                        jobName, 
-                       cfg, 
-                       new ExecutionConfig(),
+                       cfg,
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index a28fb49..8bc474b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -188,7 +188,7 @@ public class TerminalStateDeadlockTest {
                                jobId,
                                "test graph",
                                EMPTY_CONFIG,
-                               new ExecutionConfig(),
+                               ExecutionConfigTest.getSerializedConfig(),
                                TIMEOUT,
                                new FixedDelayRestartStrategy(1, 0));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index d866b2f..c483f41 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.DummyActorGateway;
@@ -76,14 +76,14 @@ public class VertexLocationConstraintTest {
                        JobVertex jobVertex = new JobVertex("test vertex", new 
JobVertexID());
                        jobVertex.setInvokableClass(DummyInvokable.class);
                        jobVertex.setParallelism(2);
-                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex);
+                       JobGraph jg = new JobGraph("test job", jobVertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                        TestingUtils.defaultExecutionContext(),
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       new ExecutionConfig(),
+                                       
ExecutionConfigTest.getSerializedConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -149,14 +149,14 @@ public class VertexLocationConstraintTest {
                        JobVertex jobVertex = new JobVertex("test vertex", new 
JobVertexID());
                        jobVertex.setInvokableClass(DummyInvokable.class);
                        jobVertex.setParallelism(2);
-                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex);
+                       JobGraph jg = new JobGraph("test job", jobVertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                        TestingUtils.defaultExecutionContext(),
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       new ExecutionConfig(),
+                                       
ExecutionConfigTest.getSerializedConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -226,14 +226,14 @@ public class VertexLocationConstraintTest {
                        jobVertex1.setSlotSharingGroup(sharingGroup);
                        jobVertex2.setSlotSharingGroup(sharingGroup);
                        
-                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex1, jobVertex2);
+                       JobGraph jg = new JobGraph("test job", jobVertex1, 
jobVertex2);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                        TestingUtils.defaultExecutionContext(),
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       new ExecutionConfig(),
+                                       
ExecutionConfigTest.getSerializedConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
@@ -294,14 +294,14 @@ public class VertexLocationConstraintTest {
                        JobVertex jobVertex = new JobVertex("test vertex", new 
JobVertexID());
                        jobVertex.setInvokableClass(DummyInvokable.class);
                        jobVertex.setParallelism(1);
-                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex);
+                       JobGraph jg = new JobGraph("test job", jobVertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                        TestingUtils.defaultExecutionContext(),
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       new ExecutionConfig(),
+                                       
ExecutionConfigTest.getSerializedConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -360,7 +360,7 @@ public class VertexLocationConstraintTest {
                        jobVertex1.setParallelism(1);
                        jobVertex2.setParallelism(1);
                        
-                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex1, jobVertex2);
+                       JobGraph jg = new JobGraph("test job", jobVertex1, 
jobVertex2);
                        
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
                        jobVertex1.setSlotSharingGroup(sharingGroup);
@@ -371,7 +371,7 @@ public class VertexLocationConstraintTest {
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       new ExecutionConfig(),
+                                       
ExecutionConfigTest.getSerializedConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
@@ -404,14 +404,14 @@ public class VertexLocationConstraintTest {
        public void testArchivingClearsFields() {
                try {
                        JobVertex vertex = new JobVertex("test vertex", new 
JobVertexID());
-                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), vertex);
+                       JobGraph jg = new JobGraph("test job", vertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                        TestingUtils.defaultExecutionContext(),
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       new ExecutionConfig(),
+                                       
ExecutionConfigTest.getSerializedConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(vertex));

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 5110249..7a23e26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -76,7 +76,7 @@ public class VertexSlotSharingTest {
                                        new JobID(),
                                        "test job",
                                        new Configuration(),
-                                       new ExecutionConfig(),
+                                       
ExecutionConfigTest.getSerializedConfig(),
                                        AkkaUtils.getDefaultTimeout(),
                                        new NoRestartStrategy());
                        eg.attachJobGraph(vertices);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 317eed7..af8aa69 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -89,8 +89,7 @@ public class PartialConsumePipelinedResultTest {
                receiver.connectNewDataSetAsInput(
                                sender, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
 
-               final JobGraph jobGraph = new JobGraph(
-                               "Partial Consume of Pipelined Result", new 
ExecutionConfig(), sender, receiver);
+               final JobGraph jobGraph = new JobGraph("Partial Consume of 
Pipelined Result", sender, receiver);
 
                final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
                                sender.getID(), receiver.getID());

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 68b05b2..74f1adf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.*;
 
 import java.util.List;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
@@ -32,7 +31,7 @@ public class JobGraphTest {
        @Test
        public void testSerialization() {
                try {
-                       JobGraph jg = new JobGraph("The graph", new 
ExecutionConfig());
+                       JobGraph jg = new JobGraph("The graph");
                        
                        // add some configuration values
                        {
@@ -91,7 +90,7 @@ public class JobGraphTest {
                        intermediate2.connectNewDataSetAsInput(intermediate1, 
DistributionPattern.POINTWISE);
                        intermediate1.connectNewDataSetAsInput(source2, 
DistributionPattern.POINTWISE);
 
-                       JobGraph graph = new JobGraph("TestGraph", new 
ExecutionConfig(),
+                       JobGraph graph = new JobGraph("TestGraph",
                                source1, source2, intermediate1, intermediate2, 
target1, target2);
                        List<JobVertex> sorted = 
graph.getVerticesSortedTopologicallyFromSources();
                        
@@ -136,7 +135,7 @@ public class JobGraphTest {
                        
                        l13.connectNewDataSetAsInput(source2, 
DistributionPattern.POINTWISE);
 
-                       JobGraph graph = new JobGraph("TestGraph", new 
ExecutionConfig(),
+                       JobGraph graph = new JobGraph("TestGraph",
                                source1, source2, root, l11, l13, l12, l2);
                        List<JobVertex> sorted = 
graph.getVerticesSortedTopologicallyFromSources();
                        
@@ -183,7 +182,7 @@ public class JobGraphTest {
                        op2.connectNewDataSetAsInput(source, 
DistributionPattern.POINTWISE);
                        op3.connectNewDataSetAsInput(op2, 
DistributionPattern.POINTWISE);
 
-                       JobGraph graph = new JobGraph("TestGraph", new 
ExecutionConfig(), source, op1, op2, op3);
+                       JobGraph graph = new JobGraph("TestGraph", source, op1, 
op2, op3);
                        List<JobVertex> sorted = 
graph.getVerticesSortedTopologicallyFromSources();
                        
                        assertEquals(4,  sorted.size());
@@ -212,7 +211,7 @@ public class JobGraphTest {
                        v3.connectNewDataSetAsInput(v2, 
DistributionPattern.POINTWISE);
                        v4.connectNewDataSetAsInput(v3, 
DistributionPattern.POINTWISE);
 
-                       JobGraph jg = new JobGraph("Cyclic Graph", new 
ExecutionConfig(), v1, v2, v3, v4);
+                       JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, 
v4);
                        try {
                                jg.getVerticesSortedTopologicallyFromSources();
                                fail("Failed to raise error on topologically 
sorting cyclic graph.");
@@ -244,7 +243,7 @@ public class JobGraphTest {
                        v4.connectNewDataSetAsInput(v3, 
DistributionPattern.POINTWISE);
                        target.connectNewDataSetAsInput(v3, 
DistributionPattern.POINTWISE);
 
-                       JobGraph jg = new JobGraph("Cyclic Graph", new 
ExecutionConfig(), v1, v2, v3, v4, source, target);
+                       JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, 
v4, source, target);
                        try {
                                jg.getVerticesSortedTopologicallyFromSources();
                                fail("Failed to raise error on topologically 
sorting cyclic graph.");

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index 612f64f..d1d5f03 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -67,7 +67,7 @@ public class JsonGeneratorTest {
                        sink1.connectNewDataSetAsInput(join2, 
DistributionPattern.POINTWISE);
                        sink2.connectNewDataSetAsInput(join1, 
DistributionPattern.ALL_TO_ALL);
 
-                       JobGraph jg = new JobGraph("my job", new 
ExecutionConfig(), source1, source2, source3,
+                       JobGraph jg = new JobGraph("my job", source1, source2, 
source3,
                                        intermediate1, intermediate2, join1, 
join2, sink1, sink2);
                        
                        String plan = JsonPlanGenerator.generatePlan(jg);

Reply via email to