Repository: flink
Updated Branches:
  refs/heads/master 099fdfa0c -> 96b353d98


http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 3785fc7..5c25003 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -126,7 +126,7 @@ public class JobManagerTest {
                                                
sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
                                                
sender.createAndAddResultDataSet(rid, PIPELINED);
 
-                                               final JobGraph jobGraph = new 
JobGraph("Blocking test job", new ExecutionConfig(), sender);
+                                               final JobGraph jobGraph = new 
JobGraph("Blocking test job", sender);
                                                final JobID jid = 
jobGraph.getJobID();
 
                                                final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(
@@ -253,7 +253,7 @@ public class JobManagerTest {
                                                sender.setParallelism(2);
                                                
sender.setInvokableClass(StoppableInvokable.class);
 
-                                               final JobGraph jobGraph = new 
JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender);
+                                               final JobGraph jobGraph = new 
JobGraph("Stoppable streaming test job", sender);
                                                final JobID jid = 
jobGraph.getJobID();
 
                                                final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
@@ -305,7 +305,7 @@ public class JobManagerTest {
                                                sender.setParallelism(1);
                                                
sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
 
-                                               final JobGraph jobGraph = new 
JobGraph("Non-Stoppable batching test job", new ExecutionConfig(), sender);
+                                               final JobGraph jobGraph = new 
JobGraph("Non-Stoppable batching test job", sender);
                                                final JobID jid = 
jobGraph.getJobID();
 
                                                final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index e820ed6..959b9a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -109,7 +109,7 @@ public class JobSubmitTest {
                        // create a simple job graph
                        JobVertex jobVertex = new JobVertex("Test Vertex");
                        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex);
+                       JobGraph jg = new JobGraph("test job", jobVertex);
 
                        // request the blob port from the job manager
                        Future<Object> future = 
jmGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
@@ -173,7 +173,7 @@ public class JobSubmitTest {
                        };
 
                        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex);
+                       JobGraph jg = new JobGraph("test job", jobVertex);
 
                        // submit the job
                        Future<Object> submitFuture = jmGateway.ask(

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index dfb0b91..561bda3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -104,7 +104,7 @@ public class SlotCountExceedingParallelismTest {
                                DistributionPattern.ALL_TO_ALL,
                                ResultPartitionType.BLOCKING);
 
-               final JobGraph jobGraph = new JobGraph(jobName, new 
ExecutionConfig(), sender, receiver);
+               final JobGraph jobGraph = new JobGraph(jobName, sender, 
receiver);
 
                // We need to allow queued scheduling, because there are not 
enough slots available
                // to run all tasks at once. We queue tasks and then let them 
finish/consume the blocking

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
index ca2ecf5..8ebb7f8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
@@ -38,7 +38,7 @@ public class StandaloneSubmittedJobGraphStoreTest {
                StandaloneSubmittedJobGraphStore jobGraphs = new 
StandaloneSubmittedJobGraphStore();
 
                SubmittedJobGraph jobGraph = new SubmittedJobGraph(
-                               new JobGraph("testNoOps", new 
ExecutionConfig()),
+                               new JobGraph("testNoOps"),
                                new JobInfo(ActorRef.noSender(), 
ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
 
                assertEquals(0, jobGraphs.recoverJobGraphs().size());

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 5e53596..c71bd35 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -261,7 +261,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
        // 
---------------------------------------------------------------------------------------------
 
        private SubmittedJobGraph createSubmittedJobGraph(JobID jobId, long 
start) {
-               final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph", 
new ExecutionConfig());
+               final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph");
 
                final JobVertex jobVertex = new JobVertex("Test JobVertex");
                jobVertex.setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index 07fc2c5..b03c38b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -113,7 +113,6 @@ public class ScheduleOrUpdateConsumersTest {
 
                final JobGraph jobGraph = new JobGraph(
                                "Mixed pipelined and blocking result",
-                               new ExecutionConfig(),
                                sender,
                                pipelinedReceiver,
                                blockingReceiver);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index a2cefb6..ccd2156 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -140,7 +140,10 @@ public class LeaderChangeJobRecoveryTest extends 
TestLogger {
 
                ExecutionConfig executionConfig = new ExecutionConfig();
 
-               return new JobGraph("Blocking test job", executionConfig, 
sender, receiver);
+               JobGraph jobGraph = new JobGraph("Blocking test job", sender, 
receiver);
+               jobGraph.setExecutionConfig(executionConfig);
+
+               return jobGraph;
        }
 
        public static class TestActorGateway implements ActorGateway {

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 6d938ac..2eacdee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -269,6 +269,6 @@ public class LeaderChangeStateCleanupTest extends 
TestLogger {
                sender.setSlotSharingGroup(slotSharingGroup);
                receiver.setSlotSharingGroup(slotSharingGroup);
 
-               return new JobGraph("Blocking test job", new ExecutionConfig(), 
sender, receiver);
+               return new JobGraph("Blocking test job", sender, receiver);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 9d33920..1b463bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -149,7 +149,8 @@ public class TaskAsyncCallTest {
 
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
                                new JobID(), new JobVertexID(), new 
ExecutionAttemptID(),
-                               new ExecutionConfig(), "Test Task", 0, 1, 0,
+                               ExecutionConfigTest.getSerializedConfig(),
+                               "Test Task", 0, 1, 0,
                                new Configuration(), new Configuration(),
                                CheckpointsInOrderInvokable.class.getName(),
                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index f0e72d7..a093233 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -97,7 +97,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends 
TestLogger {
                        producer.setSlotSharingGroup(slot);
                        consumer.setSlotSharingGroup(slot);
 
-                       JobGraph jobGraph = new JobGraph(new ExecutionConfig(), 
producer, consumer);
+                       JobGraph jobGraph = new JobGraph(producer, consumer);
 
                        // Submit job and wait until running
                        ActorGateway jobManager = 
flink.getLeaderGateway(deadline.timeLeft());

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index e7f4c5c..09dd817 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -74,7 +74,7 @@ public class TaskCancelTest {
                        flink.start();
 
                        // Setup
-                       final JobGraph jobGraph = new JobGraph("Cancel Big 
Union", new ExecutionConfig());
+                       final JobGraph jobGraph = new JobGraph("Cancel Big 
Union");
 
                        JobVertex[] sources = new JobVertex[numberOfSources];
                        SlotSharingGroup group = new SlotSharingGroup();

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 9e7abb6..3ee9a84 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -24,6 +24,7 @@ import akka.actor.Props;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -66,6 +67,7 @@ import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -158,7 +160,7 @@ public class TaskManagerTest extends TestLogger {
                                final JobID jid = new JobID();
                                final JobVertexID vid = new JobVertexID();
                                final ExecutionAttemptID eid = new 
ExecutionAttemptID();
-                               final ExecutionConfig executionConfig = new 
ExecutionConfig();
+                               final SerializedValue<ExecutionConfig> 
executionConfig = ExecutionConfigTest.getSerializedConfig();
 
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(jid, vid, eid, executionConfig,
                                                "TestTask", 2, 7, 0, new 
Configuration(), new Configuration(),
@@ -262,14 +264,14 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
                                final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, vid1, eid1,
-                                               new ExecutionConfig(), 
"TestTask1", 1, 5, 0,
+                                               
ExecutionConfigTest.getSerializedConfig(), "TestTask1", 1, 5, 0,
                                                new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
                                final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, vid2, eid2,
-                                               new ExecutionConfig(), 
"TestTask2", 2, 7, 0,
+                                                
ExecutionConfigTest.getSerializedConfig(), "TestTask2", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -395,13 +397,15 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, vid1, eid1, new ExecutionConfig(),
+                               final SerializedValue<ExecutionConfig> 
executionConfig = ExecutionConfigTest.getSerializedConfig();
+
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, vid1, eid1, executionConfig,
                                                "TestTask1", 1, 5, 0, new 
Configuration(), new Configuration(), StoppableInvokable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, vid2, eid2, new ExecutionConfig(),
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, vid2, eid2, executionConfig,
                                                "TestTask2", 2, 7, 0, new 
Configuration(), new Configuration(), 
TestInvokableBlockingCancelable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -522,14 +526,14 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
                                final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, vid1, eid1,
-                                               new ExecutionConfig(), 
"Sender", 0, 1, 0,
+                                               
ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
                                final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, vid2, eid2,
-                                               new ExecutionConfig(), 
"Receiver", 2, 7, 0,
+                                               
ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Receiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -623,13 +627,13 @@ public class TaskManagerTest extends TestLogger {
                                                );
 
                                final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, vid1, eid1,
-                                               new ExecutionConfig(), 
"Sender", 0, 1, 0,
+                                               
ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
                                                irpdd, 
Collections.<InputGateDeploymentDescriptor>emptyList(), new 
ArrayList<BlobKey>(),
                                                Collections.<URL>emptyList(), 
0);
 
                                final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, vid2, eid2,
-                                               new ExecutionConfig(), 
"Receiver", 2, 7, 0,
+                                               
ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Receiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.singletonList(ircdd),
@@ -764,13 +768,13 @@ public class TaskManagerTest extends TestLogger {
                                                );
 
                                final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, vid1, eid1,
-                                               new ExecutionConfig(), 
"Sender", 0, 1, 0,
+                                               
ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
                                                irpdd, 
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
                                final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, vid2, eid2,
-                                               new ExecutionConfig(), 
"Receiver", 2, 7, 0,
+                                               
ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.BlockingReceiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.singletonList(ircdd),
@@ -909,7 +913,7 @@ public class TaskManagerTest extends TestLogger {
 
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
                                                jid, vid, eid,
-                                               new ExecutionConfig(), 
"Receiver", 0, 1, 0,
+                                               
ExecutionConfigTest.getSerializedConfig(), "Receiver", 0, 1, 0,
                                                new Configuration(), new 
Configuration(),
                                                
Tasks.AgnosticReceiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
@@ -1003,7 +1007,7 @@ public class TaskManagerTest extends TestLogger {
                                                new 
InputGateDeploymentDescriptor(resultId, 0, icdd);
 
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
-                                               jid, vid, eid, new 
ExecutionConfig(), "Receiver", 0, 1, 0,
+                                               jid, vid, eid, 
ExecutionConfigTest.getSerializedConfig(), "Receiver", 0, 1, 0,
                                                new Configuration(), new 
Configuration(),
                                                
Tasks.AgnosticReceiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
@@ -1079,7 +1083,7 @@ public class TaskManagerTest extends TestLogger {
                                                new JobID(),
                                                new JobVertexID(),
                                                new ExecutionAttemptID(),
-                                               new ExecutionConfig(),
+                                               
ExecutionConfigTest.getSerializedConfig(),
                                                "Task",
                                                0,
                                                1,

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index aa37d47..99e037d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -62,7 +63,7 @@ public class TaskStopTest {
                
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
                
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
                
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
-               
when(tddMock.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+               
when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class));
                when(tddMock.getInvokableClassName()).thenReturn("className");
 
                task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 034681e..06f393f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import com.google.common.collect.Maps;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -628,7 +628,8 @@ public class TaskTest {
        private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? 
extends AbstractInvokable> invokable) {
                return new TaskDeploymentDescriptor(
                                new JobID(), new JobVertexID(), new 
ExecutionAttemptID(),
-                               new ExecutionConfig(), "Test Task", 0, 1, 0,
+                               ExecutionConfigTest.getSerializedConfig(),
+                               "Test Task", 0, 1, 0,
                                new Configuration(), new Configuration(),
                                invokable.getName(),
                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index ef3dae4..1927c39 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.executiongraph
 
-import org.apache.flink.api.common.{ExecutionConfig, JobID}
+import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest, 
JobID}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
@@ -50,14 +50,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike 
with Matchers {
         sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
         sender.setParallelism(20)
 
-        val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), 
sender)
+        val jobGraph = new JobGraph("Pointwise job", sender)
 
         val eg = new ExecutionGraph(
           TestingUtils.defaultExecutionContext,
           new JobID(),
           "test job",
           new Configuration(),
-          new ExecutionConfig,
+          ExecutionConfigTest.getSerializedConfig,
           AkkaUtils.getDefaultTimeout,
           new NoRestartStrategy())
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index f52d37e..12e2d63 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -68,7 +68,7 @@ class CoLocationConstraintITCase(_system: ActorSystem)
 
       receiver.setStrictlyCoLocatedWith(sender)
 
-      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 894ba38..2b5b29f 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -68,7 +68,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(2)
       vertex.setInvokableClass(classOf[BlockingNoOpInvokable])
 
-      val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex)
+      val jobGraph = new JobGraph("Test Job", vertex)
 
       val cluster = TestingUtils.startTestingCluster(1)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -110,7 +110,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(num_tasks)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex)
+      val jobGraph = new JobGraph("Test Job", vertex)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -145,7 +145,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(num_tasks)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph = new JobGraph("Test job", new ExecutionConfig(), vertex)
+      val jobGraph = new JobGraph("Test job", vertex)
       jobGraph.setAllowQueuedScheduling(true)
 
       val cluster = TestingUtils.startTestingCluster(10)
@@ -181,7 +181,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -216,7 +216,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -253,8 +253,7 @@ class JobManagerITCase(_system: ActorSystem)
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
       receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(),
-        sender1, receiver, sender2)
+      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
       val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -299,8 +298,7 @@ class JobManagerITCase(_system: ActorSystem)
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
       receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(),
-        sender1, receiver, sender2)
+      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
       val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -342,8 +340,7 @@ class JobManagerITCase(_system: ActorSystem)
       forwarder.connectNewDataSetAsInput(sender, 
DistributionPattern.ALL_TO_ALL)
       receiver.connectNewDataSetAsInput(forwarder, 
DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Forwarding Job", new ExecutionConfig(),
-        sender, forwarder, receiver)
+      val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, 
receiver)
 
       jobGraph.setScheduleMode(ScheduleMode.ALL)
 
@@ -379,7 +376,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -427,7 +424,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -472,7 +469,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -512,7 +509,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -560,7 +557,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -603,8 +600,7 @@ class JobManagerITCase(_system: ActorSystem)
       source.setParallelism(num_tasks)
       sink.setParallelism(num_tasks)
 
-      val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition",
-        new ExecutionConfig(), source, sink)
+      val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source, 
sink)
 
       val cluster = TestingUtils.startTestingCluster(2*num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -630,12 +626,12 @@ class JobManagerITCase(_system: ActorSystem)
       val vertex = new JobVertex("Test Vertex")
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph1 = new JobGraph("Test Job", new ExecutionConfig(), vertex)
+      val jobGraph1 = new JobGraph("Test Job", vertex)
 
       val slowVertex = new WaitingOnFinalizeJobVertex("Long running Vertex", 
2000)
       slowVertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph2 = new JobGraph("Long running Job", new ExecutionConfig(), 
slowVertex)
+      val jobGraph2 = new JobGraph("Long running Job", slowVertex)
 
       val cluster = TestingUtils.startTestingCluster(1)
       val jm = cluster.getLeaderGateway(1 seconds)
@@ -684,7 +680,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(1)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex)
+      val jobGraph = new JobGraph("Test Job", vertex)
 
       val cluster = TestingUtils.startTestingCluster(1)
       val jm = cluster.getLeaderGateway(1 seconds)
@@ -782,7 +778,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
+          val jobGraph = new JobGraph(jobVertex)
 
           // Submit job w/o checkpointing configured
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), 
testActor)
@@ -815,7 +811,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
+          val jobGraph = new JobGraph(jobVertex)
           jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
@@ -868,7 +864,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
+          val jobGraph = new JobGraph(jobVertex)
           jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
@@ -926,7 +922,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
+          val jobGraph = new JobGraph(jobVertex)
           jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index ea42cd1..b96369f 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -86,7 +86,8 @@ class RecoveryITCase(_system: ActorSystem)
       val executionConfig = new ExecutionConfig()
       executionConfig.setNumberOfExecutionRetries(1);
 
-      val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, 
receiver)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      jobGraph.setExecutionConfig(executionConfig)
 
       val cluster = createTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "2 
s")
       cluster.start()
@@ -133,7 +134,8 @@ class RecoveryITCase(_system: ActorSystem)
       val executionConfig = new ExecutionConfig()
       executionConfig.setNumberOfExecutionRetries(1);
 
-      val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, 
receiver)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      jobGraph.setExecutionConfig(executionConfig)
 
       val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "2 s")
       cluster.start()
@@ -180,7 +182,8 @@ class RecoveryITCase(_system: ActorSystem)
       val executionConfig = new ExecutionConfig()
       executionConfig.setNumberOfExecutionRetries(1);
 
-      val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, 
receiver)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      jobGraph.setExecutionConfig(executionConfig)
 
       val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "2 s")
       cluster.start()

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index 4d320ea..f986e73 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -66,7 +66,7 @@ class SlotSharingITCase(_system: ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -110,8 +110,7 @@ class SlotSharingITCase(_system: ActorSystem)
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
       receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Bipartite job", new ExecutionConfig(),
-        sender1, sender2, receiver)
+      val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 100fe66..d0136f0 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -67,7 +67,7 @@ class TaskManagerFailsWithSlotSharingITCase(_system: 
ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)
@@ -116,7 +116,7 @@ class TaskManagerFailsWithSlotSharingITCase(_system: 
ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c3b515e..9adabae 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -55,7 +55,6 @@ import 
org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -110,7 +109,8 @@ public class StreamingJobGraphGenerator {
        }
 
        public JobGraph createJobGraph() {
-               jobGraph = new JobGraph(streamGraph.getJobName(), 
streamGraph.getExecutionConfig());
+
+               jobGraph = new JobGraph(streamGraph.getJobName());
 
                // make sure that all vertices start immediately
                jobGraph.setScheduleMode(ScheduleMode.ALL);
@@ -126,15 +126,11 @@ public class StreamingJobGraphGenerator {
                setPhysicalEdges();
 
                setSlotSharing();
-               
+
                configureCheckpointing();
 
-               try {
-                       // make sure that we can send the ExecutionConfig 
without user code object problems
-                       jobGraph.getExecutionConfig().serializeUserCode();
-               } catch (IOException e) {
-                       throw new IllegalStateException("Could not serialize 
ExecutionConfig.", e);
-               }
+               // set the ExecutionConfig last when it has been finalized
+               jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
 
                return jobGraph;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
index 0517576..f768ace 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api;
 
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -42,7 +43,8 @@ public class RestartStrategyTest {
                StreamGraph graph = env.getStreamGraph();
                JobGraph jobGraph = graph.getJobGraph();
 
-               RestartStrategies.RestartStrategyConfiguration restartStrategy 
= jobGraph.getExecutionConfig().getRestartStrategy();
+               RestartStrategies.RestartStrategyConfiguration restartStrategy =
+                       
ExecutionConfigTest.deserializeConfig(jobGraph.getSerializedExecutionConfig()).getRestartStrategy();
 
                Assert.assertNotNull(restartStrategy);
                Assert.assertTrue(restartStrategy instanceof 
RestartStrategies.FixedDelayRestartStrategyConfiguration);
@@ -64,7 +66,8 @@ public class RestartStrategyTest {
                StreamGraph graph = env.getStreamGraph();
                JobGraph jobGraph = graph.getJobGraph();
 
-               RestartStrategies.RestartStrategyConfiguration restartStrategy 
= jobGraph.getExecutionConfig().getRestartStrategy();
+               RestartStrategies.RestartStrategyConfiguration restartStrategy =
+                       
ExecutionConfigTest.deserializeConfig(jobGraph.getSerializedExecutionConfig()).getRestartStrategy();
 
                Assert.assertNotNull(restartStrategy);
                Assert.assertTrue(restartStrategy instanceof 
RestartStrategies.NoRestartStrategyConfiguration);
@@ -86,7 +89,8 @@ public class RestartStrategyTest {
                StreamGraph graph = env.getStreamGraph();
                JobGraph jobGraph = graph.getJobGraph();
 
-               RestartStrategies.RestartStrategyConfiguration restartStrategy 
= jobGraph.getExecutionConfig().getRestartStrategy();
+               RestartStrategies.RestartStrategyConfiguration restartStrategy =
+                       
ExecutionConfigTest.deserializeConfig(jobGraph.getSerializedExecutionConfig()).getRestartStrategy();
 
                Assert.assertNotNull(restartStrategy);
                Assert.assertTrue(restartStrategy instanceof 
RestartStrategies.FixedDelayRestartStrategyConfiguration);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index e9aec48..0de4325 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Random;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -29,6 +30,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.util.InstantiationUtil;
 
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -80,17 +82,19 @@ public class StreamingJobGraphGeneratorTest {
 
                final String EXEC_CONFIG_KEY = "runtime.config";
 
-               
InstantiationUtil.writeObjectToConfig(jobGraph.getExecutionConfig(),
+               
InstantiationUtil.writeObjectToConfig(jobGraph.getSerializedExecutionConfig(),
                        jobGraph.getJobConfiguration(),
                        EXEC_CONFIG_KEY);
 
-               ExecutionConfig executionConfig = 
InstantiationUtil.readObjectFromConfig(
+               SerializedValue<ExecutionConfig> serializedExecutionConfig = 
InstantiationUtil.readObjectFromConfig(
                                jobGraph.getJobConfiguration(),
                                EXEC_CONFIG_KEY,
                                Thread.currentThread().getContextClassLoader());
-               
-               assertNotNull(executionConfig);
-               
+
+               assertNotNull(serializedExecutionConfig);
+
+               ExecutionConfig executionConfig = 
ExecutionConfigTest.deserializeConfig(serializedExecutionConfig);
+
                assertEquals(closureCleanerEnabled, 
executionConfig.isClosureCleanerEnabled());
                assertEquals(forceAvroEnabled, 
executionConfig.isForceAvroEnabled());
                assertEquals(forceKryoEnabled, 
executionConfig.isForceKryoEnabled());

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 732d3e5..8484e90 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -135,7 +135,7 @@ public class RescalePartitionerTest extends TestLogger {
                        jobId,
                        jobName,
                        cfg,
-                       new ExecutionConfig(),
+                       ExecutionConfigTest.getSerializedConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy(),
                        new ArrayList<BlobKey>(),

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 7f4492a..ed8bf01 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import akka.actor.ActorRef;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -135,7 +135,8 @@ public class StreamTaskTest {
 
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
                                new JobID(), new JobVertexID(), new 
ExecutionAttemptID(),
-                               new ExecutionConfig(), "Test Task", 0, 1, 0,
+                               ExecutionConfigTest.getSerializedConfig(),
+                               "Test Task", 0, 1, 0,
                                new Configuration(),
                                taskConfig.getConfiguration(),
                                invokable.getName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index f6c22d4..28c2e58 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.failingPrograms;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -65,7 +64,7 @@ public class JobSubmissionFailsITCase {
                        
                        final JobVertex jobVertex = new JobVertex("Working job 
vertex.");
                        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-                       workingJobGraph = new JobGraph("Working testing job", 
new ExecutionConfig(), jobVertex);
+                       workingJobGraph = new JobGraph("Working testing job", 
jobVertex);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -116,7 +115,7 @@ public class JobSubmissionFailsITCase {
                        final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
                        
failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
 
-                       final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", new ExecutionConfig(), failingJobVertex);
+                       final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", failingJobVertex);
 
                        try {
                                submitJob(failingJobGraph);
@@ -141,7 +140,7 @@ public class JobSubmissionFailsITCase {
        @Test
        public void testSubmitEmptyJobGraph() {
                try {
-                       final JobGraph jobGraph = new JobGraph("Testing job", 
new ExecutionConfig());
+                       final JobGraph jobGraph = new JobGraph("Testing job");
        
                        try {
                                submitJob(jobGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index f4d88a8..b9284dc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.recovery;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -368,7 +367,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                        // BLocking JobGraph
                        JobVertex blockingVertex = new JobVertex("Blocking 
vertex");
                        
blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
-                       JobGraph jobGraph = new JobGraph(new ExecutionConfig(), 
blockingVertex);
+                       JobGraph jobGraph = new JobGraph(blockingVertex);
 
                        // Submit the job in detached mode
                        leader.tell(new SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED));

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index 2418853..b4ffbd4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -440,7 +440,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
         * Creates a simple blocking JobGraph.
         */
        private static JobGraph createBlockingJobGraph() {
-               JobGraph jobGraph = new JobGraph("Blocking program", new 
ExecutionConfig());
+               JobGraph jobGraph = new JobGraph("Blocking program");
 
                JobVertex jobVertex = new JobVertex("Blocking Vertex");
                jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index a870578..06df46f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -97,7 +97,7 @@ public class NetworkStackThroughputITCase {
 
                private JobGraph createJobGraph(int dataVolumeGb, boolean 
useForwarder, boolean isSlowSender,
                                                                                
boolean isSlowReceiver, int numSubtasks) {
-                       JobGraph jobGraph = new JobGraph("Speed Test", new 
ExecutionConfig());
+                       JobGraph jobGraph = new JobGraph("Speed Test");
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
                        JobVertex producer = new JobVertex("Speed Test 
Producer");

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 09f9cac..45ee839 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.PoisonPill;
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -168,7 +167,7 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
                sender.setSlotSharingGroup(slotSharingGroup);
                receiver.setSlotSharingGroup(slotSharingGroup);
 
-               final JobGraph graph = new JobGraph("Blocking test job", new 
ExecutionConfig(), sender, receiver);
+               final JobGraph graph = new JobGraph("Blocking test job", 
sender, receiver);
 
                final ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index e7b37d7..9cc90a1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 
 import org.apache.commons.io.FileUtils;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -213,7 +212,7 @@ public class WebFrontendITCase {
                sender.setParallelism(2);
                sender.setInvokableClass(StoppableInvokable.class);
 
-               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", new ExecutionConfig(), sender);
+               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", sender);
                final JobID jid = jobGraph.getJobID();
 
                cluster.submitJobDetached(jobGraph);
@@ -251,7 +250,7 @@ public class WebFrontendITCase {
                sender.setParallelism(2);
                sender.setInvokableClass(StoppableInvokable.class);
 
-               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", new ExecutionConfig(), sender);
+               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", sender);
                final JobID jid = jobGraph.getJobID();
 
                cluster.submitJobDetached(jobGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 2265b3b..ac661f3 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.runtime.jobmanager
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, 
NoOpInvokable}
@@ -94,12 +94,12 @@ class JobManagerFailsITCase(_system: ActorSystem)
       val sender = new JobVertex("BlockingSender")
       sender.setParallelism(num_slots)
       sender.setInvokableClass(classOf[BlockingNoOpInvokable])
-      val jobGraph = new JobGraph("Blocking Testjob", new ExecutionConfig(), 
sender)
+      val jobGraph = new JobGraph("Blocking Testjob", sender)
 
       val noOp = new JobVertex("NoOpInvokable")
       noOp.setParallelism(num_slots)
       noOp.setInvokableClass(classOf[NoOpInvokable])
-      val jobGraph2 = new JobGraph("NoOp Testjob", new ExecutionConfig(), noOp)
+      val jobGraph2 = new JobGraph("NoOp Testjob", noOp)
 
       val cluster = startDeathwatchCluster(num_slots / 2, 2)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
index 9aa1e94..78bc0ee 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
@@ -65,7 +65,7 @@ class JobManagerLeaderSessionIDITSuite(_system: ActorSystem)
     val sender = new JobVertex("BlockingSender");
     sender.setParallelism(numSlots)
     sender.setInvokableClass(classOf[BlockingUntilSignalNoOpInvokable])
-    val jobGraph = new JobGraph("TestJob", new ExecutionConfig(), sender)
+    val jobGraph = new JobGraph("TestJob", sender)
 
     val oldSessionID = UUID.randomUUID()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index f1e115d..258f6df 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,21 +20,19 @@ package org.apache.flink.api.scala.runtime.taskmanager
 
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.ExecutionConfig
-
+import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
+import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, 
JobGraph}
-import org.apache.flink.runtime.jobmanager.Tasks.{NoOpInvokable, 
BlockingNoOpInvokable, BlockingReceiver, Sender}
+import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, 
JobVertex}
+import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, 
BlockingReceiver, NoOpInvokable, Sender}
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import 
org.apache.flink.runtime.messages.TaskManagerMessages.{RegisteredAtJobManager, 
NotifyWhenRegisteredAtJobManager}
+import 
org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager,
 RegisteredAtJobManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
 import org.apache.flink.test.util.ForkableFlinkMiniCluster
-
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -99,7 +97,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       receiver.setParallelism(num_tasks)
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
@@ -151,7 +149,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       receiver.setParallelism(num_tasks)
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
@@ -190,12 +188,12 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val sender = new JobVertex("BlockingSender")
       sender.setParallelism(num_slots)
       sender.setInvokableClass(classOf[BlockingNoOpInvokable])
-      val jobGraph = new JobGraph("Blocking Testjob", new ExecutionConfig(), 
sender)
+      val jobGraph = new JobGraph("Blocking Testjob", sender)
 
       val noOp = new JobVertex("NoOpInvokable")
       noOp.setParallelism(num_slots)
       noOp.setInvokableClass(classOf[NoOpInvokable])
-      val jobGraph2 = new JobGraph("NoOp Testjob", new ExecutionConfig(), noOp)
+      val jobGraph2 = new JobGraph("NoOp Testjob", noOp)
 
       val cluster = createDeathwatchCluster(num_slots/2, 2)
 

Reply via email to