Repository: flink
Updated Branches:
  refs/heads/master d0a390f9b -> 0f8d76c6f


http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index adaff29..1cd01ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -105,7 +106,7 @@ public class JobSubmitTest {
                        // create a simple job graph
                        JobVertex jobVertex = new JobVertex("Test Vertex");
                        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-                       JobGraph jg = new JobGraph("test job", jobVertex);
+                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex);
 
                        // request the blob port from the job manager
                        Future<Object> future = 
jmGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
@@ -169,7 +170,7 @@ public class JobSubmitTest {
                        };
 
                        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-                       JobGraph jg = new JobGraph("test job", jobVertex);
+                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex);
 
                        // submit the job
                        Future<Object> submitFuture = jmGateway.ask(

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 1733406..dfb0b91 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -103,7 +104,7 @@ public class SlotCountExceedingParallelismTest {
                                DistributionPattern.ALL_TO_ALL,
                                ResultPartitionType.BLOCKING);
 
-               final JobGraph jobGraph = new JobGraph(jobName, sender, 
receiver);
+               final JobGraph jobGraph = new JobGraph(jobName, new 
ExecutionConfig(), sender, receiver);
 
                // We need to allow queued scheduling, because there are not 
enough slots available
                // to run all tasks at once. We queue tasks and then let them 
finish/consume the blocking

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
index 753e7be..ca2ecf5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -37,7 +38,7 @@ public class StandaloneSubmittedJobGraphStoreTest {
                StandaloneSubmittedJobGraphStore jobGraphs = new 
StandaloneSubmittedJobGraphStore();
 
                SubmittedJobGraph jobGraph = new SubmittedJobGraph(
-                               new JobGraph("testNoOps"),
+                               new JobGraph("testNoOps", new 
ExecutionConfig()),
                                new JobInfo(ActorRef.noSender(), 
ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
 
                assertEquals(0, jobGraphs.recoverJobGraphs().size());

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 356ba36..5e53596 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -260,7 +261,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
        // 
---------------------------------------------------------------------------------------------
 
        private SubmittedJobGraph createSubmittedJobGraph(JobID jobId, long 
start) {
-               final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph");
+               final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph", 
new ExecutionConfig());
 
                final JobVertex jobVertex = new JobVertex("Test JobVertex");
                jobVertex.setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index eb4d96f..07fc2c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 
 import com.google.common.collect.Lists;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -112,6 +113,7 @@ public class ScheduleOrUpdateConsumersTest {
 
                final JobGraph jobGraph = new JobGraph(
                                "Mixed pipelined and blocking result",
+                               new ExecutionConfig(),
                                sender,
                                pipelinedReceiver,
                                blockingReceiver);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index c490a64..f14d62f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -268,6 +269,6 @@ public class LeaderChangeStateCleanupTest extends 
TestLogger {
                sender.setSlotSharingGroup(slotSharingGroup);
                receiver.setSlotSharingGroup(slotSharingGroup);
 
-               return new JobGraph("Blocking test job", sender, receiver);
+               return new JobGraph("Blocking test job", new ExecutionConfig(), 
sender, receiver);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 3fcc425..233dace 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
@@ -44,12 +45,18 @@ public class DummyEnvironment implements Environment {
        private final TaskInfo taskInfo;
        private final JobID jobId = new JobID();
        private final JobVertexID jobVertexId = new JobVertexID();
+       private final ExecutionConfig executionConfig = new ExecutionConfig();
 
        public DummyEnvironment(String taskName, int numSubTasks, int 
subTaskIndex) {
                this.taskInfo = new TaskInfo(taskName, subTaskIndex, 
numSubTasks, 0);
        }
 
        @Override
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
+       }
+
+       @Override
        public JobID getJobID() {
                return jobId;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index fa97210..d29b206 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
@@ -65,6 +66,8 @@ public class MockEnvironment implements Environment {
        
        private final TaskInfo taskInfo;
        
+       private final ExecutionConfig executionConfig;
+
        private final MemoryManager memManager;
 
        private final IOManager ioManager;
@@ -96,6 +99,7 @@ public class MockEnvironment implements Environment {
 
                this.memManager = new MemoryManager(memorySize, 1);
                this.ioManager = new IOManagerAsync();
+               this.executionConfig = new ExecutionConfig();
                this.inputSplitProvider = inputSplitProvider;
                this.bufferSize = bufferSize;
 
@@ -186,6 +190,11 @@ public class MockEnvironment implements Environment {
        }
 
        @Override
+       public ExecutionConfig getExecutionConfig() {
+               return this.executionConfig;
+       }
+
+       @Override
        public JobID getJobID() {
                return this.jobID;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index e5ff7b1..9d33920 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -148,7 +149,7 @@ public class TaskAsyncCallTest {
 
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
                                new JobID(), new JobVertexID(), new 
ExecutionAttemptID(),
-                               "Test Task", 0, 1, 0,
+                               new ExecutionConfig(), "Test Task", 0, 1, 0,
                                new Configuration(), new Configuration(),
                                CheckpointsInOrderInvokable.class.getName(),
                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index a3d1883..e7f4c5c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -73,7 +74,7 @@ public class TaskCancelTest {
                        flink.start();
 
                        // Setup
-                       final JobGraph jobGraph = new JobGraph("Cancel Big 
Union");
+                       final JobGraph jobGraph = new JobGraph("Cancel Big 
Union", new ExecutionConfig());
 
                        JobVertex[] sources = new JobVertex[numberOfSources];
                        SlotSharingGroup group = new SlotSharingGroup();

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index b80cb0e..56c2e43 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -157,9 +158,11 @@ public class TaskManagerTest extends TestLogger {
                                final JobID jid = new JobID();
                                final JobVertexID vid = new JobVertexID();
                                final ExecutionAttemptID eid = new 
ExecutionAttemptID();
+                               final ExecutionConfig executionConfig = new 
ExecutionConfig();
 
-                               final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7, 0,
-                                               new Configuration(), new 
Configuration(), TestInvokableCorrect.class.getName(),
+                               final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(jid, vid, eid, executionConfig,
+                                               "TestTask", 2, 7, 0, new 
Configuration(), new Configuration(),
+                                               
TestInvokableCorrect.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
@@ -258,13 +261,15 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5, 0,
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, vid1, eid1,
+                                               new ExecutionConfig(), 
"TestTask1", 1, 5, 0,
                                                new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7, 0,
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, vid2, eid2,
+                                               new ExecutionConfig(), 
"TestTask2", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -390,14 +395,14 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5, 0,
-                                               new Configuration(), new 
Configuration(), StoppableInvokable.class.getName(),
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, vid1, eid1, new ExecutionConfig(),
+                                               "TestTask1", 1, 5, 0, new 
Configuration(), new Configuration(), StoppableInvokable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7, 0,
-                                               new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, vid2, eid2, new ExecutionConfig(),
+                                               "TestTask2", 2, 7, 0, new 
Configuration(), new Configuration(), 
TestInvokableBlockingCancelable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
@@ -516,13 +521,15 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0,
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, vid1, eid1,
+                                               new ExecutionConfig(), 
"Sender", 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0,
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, vid2, eid2,
+                                               new ExecutionConfig(), 
"Receiver", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Receiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -615,12 +622,14 @@ public class TaskManagerTest extends TestLogger {
                                                                }
                                                );
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0,
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, vid1, eid1,
+                                               new ExecutionConfig(), 
"Sender", 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
                                                irpdd, 
Collections.<InputGateDeploymentDescriptor>emptyList(), new 
ArrayList<BlobKey>(),
                                                Collections.<URL>emptyList(), 
0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0,
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, vid2, eid2,
+                                               new ExecutionConfig(), 
"Receiver", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Receiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.singletonList(ircdd),
@@ -754,12 +763,14 @@ public class TaskManagerTest extends TestLogger {
                                                                }
                                                );
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0,
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, vid1, eid1,
+                                               new ExecutionConfig(), 
"Sender", 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
                                                irpdd, 
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0,
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, vid2, eid2,
+                                               new ExecutionConfig(), 
"Receiver", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.BlockingReceiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.singletonList(ircdd),
@@ -897,7 +908,8 @@ public class TaskManagerTest extends TestLogger {
                                                new 
InputGateDeploymentDescriptor(resultId, 0, icdd);
 
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
-                                               jid, vid, eid, "Receiver", 0, 
1, 0,
+                                               jid, vid, eid,
+                                               new ExecutionConfig(), 
"Receiver", 0, 1, 0,
                                                new Configuration(), new 
Configuration(),
                                                
Tasks.AgnosticReceiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
@@ -991,7 +1003,7 @@ public class TaskManagerTest extends TestLogger {
                                                new 
InputGateDeploymentDescriptor(resultId, 0, icdd);
 
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
-                                               jid, vid, eid, "Receiver", 0, 
1, 0,
+                                               jid, vid, eid, new 
ExecutionConfig(), "Receiver", 0, 1, 0,
                                                new Configuration(), new 
Configuration(),
                                                
Tasks.AgnosticReceiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
@@ -1067,6 +1079,7 @@ public class TaskManagerTest extends TestLogger {
                                                new JobID(),
                                                new JobVertexID(),
                                                new ExecutionAttemptID(),
+                                               new ExecutionConfig(),
                                                "Task",
                                                0,
                                                1,

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index a60e074..aa37d47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
@@ -61,6 +62,7 @@ public class TaskStopTest {
                
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
                
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
                
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+               
when(tddMock.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
                when(tddMock.getInvokableClassName()).thenReturn("className");
 
                task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 45ca364..034681e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import com.google.common.collect.Maps;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -627,7 +628,7 @@ public class TaskTest {
        private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? 
extends AbstractInvokable> invokable) {
                return new TaskDeploymentDescriptor(
                                new JobID(), new JobVertexID(), new 
ExecutionAttemptID(),
-                               "Test Task", 0, 1, 0,
+                               new ExecutionConfig(), "Test Task", 0, 1, 0,
                                new Configuration(), new Configuration(),
                                invokable.getName(),
                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index dbd87d0..ef3dae4 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph
 
-import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
@@ -50,13 +50,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike 
with Matchers {
         sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
         sender.setParallelism(20)
 
-        val jobGraph = new JobGraph("Pointwise job", sender)
+        val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), 
sender)
 
         val eg = new ExecutionGraph(
           TestingUtils.defaultExecutionContext,
           new JobID(),
           "test job",
           new Configuration(),
+          new ExecutionConfig,
           AkkaUtils.getDefaultTimeout,
           new NoRestartStrategy())
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index a0c144a..f52d37e 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, 
JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender}
@@ -67,7 +68,7 @@ class CoLocationConstraintITCase(_system: ActorSystem)
 
       receiver.setStrictlyCoLocatedWith(sender)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), 
sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index ec54b7e..894ba38 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
-import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, 
SavepointCoordinator}
 import org.apache.flink.runtime.client.JobExecutionException
@@ -68,7 +68,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(2)
       vertex.setInvokableClass(classOf[BlockingNoOpInvokable])
 
-      val jobGraph = new JobGraph("Test Job", vertex)
+      val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex)
 
       val cluster = TestingUtils.startTestingCluster(1)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -110,7 +110,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(num_tasks)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph = new JobGraph("Test Job", vertex)
+      val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -145,7 +145,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(num_tasks)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph = new JobGraph("Test job", vertex)
+      val jobGraph = new JobGraph("Test job", new ExecutionConfig(), vertex)
       jobGraph.setAllowQueuedScheduling(true)
 
       val cluster = TestingUtils.startTestingCluster(10)
@@ -181,7 +181,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -216,7 +216,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
+      val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(), 
sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -253,7 +253,8 @@ class JobManagerITCase(_system: ActorSystem)
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
       receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
+      val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(),
+        sender1, receiver, sender2)
 
       val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -298,7 +299,8 @@ class JobManagerITCase(_system: ActorSystem)
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
       receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
+      val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(),
+        sender1, receiver, sender2)
 
       val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -340,7 +342,8 @@ class JobManagerITCase(_system: ActorSystem)
       forwarder.connectNewDataSetAsInput(sender, 
DistributionPattern.ALL_TO_ALL)
       receiver.connectNewDataSetAsInput(forwarder, 
DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, 
receiver)
+      val jobGraph = new JobGraph("Forwarding Job", new ExecutionConfig(),
+        sender, forwarder, receiver)
 
       jobGraph.setScheduleMode(ScheduleMode.ALL)
 
@@ -376,7 +379,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -424,7 +427,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -469,7 +472,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), 
sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -509,7 +512,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), 
sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -557,7 +560,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), 
sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -600,7 +603,8 @@ class JobManagerITCase(_system: ActorSystem)
       source.setParallelism(num_tasks)
       sink.setParallelism(num_tasks)
 
-      val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source, 
sink)
+      val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition",
+        new ExecutionConfig(), source, sink)
 
       val cluster = TestingUtils.startTestingCluster(2*num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -626,12 +630,12 @@ class JobManagerITCase(_system: ActorSystem)
       val vertex = new JobVertex("Test Vertex")
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph1 = new JobGraph("Test Job", vertex)
+      val jobGraph1 = new JobGraph("Test Job", new ExecutionConfig(), vertex)
 
       val slowVertex = new WaitingOnFinalizeJobVertex("Long running Vertex", 
2000)
       slowVertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph2 = new JobGraph("Long running Job", slowVertex)
+      val jobGraph2 = new JobGraph("Long running Job", new ExecutionConfig(), 
slowVertex)
 
       val cluster = TestingUtils.startTestingCluster(1)
       val jm = cluster.getLeaderGateway(1 seconds)
@@ -680,7 +684,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(1)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph = new JobGraph("Test Job", vertex)
+      val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex)
 
       val cluster = TestingUtils.startTestingCluster(1)
       val jm = cluster.getLeaderGateway(1 seconds)
@@ -778,7 +782,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(jobVertex)
+          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
 
           // Submit job w/o checkpointing configured
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), 
testActor)
@@ -811,7 +815,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(jobVertex)
+          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
           jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
@@ -864,7 +868,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(jobVertex)
+          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
           jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
@@ -922,7 +926,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(jobVertex)
+          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
           jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 41b6702..ea42cd1 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.{PoisonPill, ActorSystem}
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, 
DistributionPattern, JobVertex}
@@ -82,7 +83,10 @@ class RecoveryITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val executionConfig = new ExecutionConfig()
+      executionConfig.setNumberOfExecutionRetries(1);
+
+      val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, 
receiver)
 
       val cluster = createTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "2 
s")
       cluster.start()
@@ -126,7 +130,10 @@ class RecoveryITCase(_system: ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val executionConfig = new ExecutionConfig()
+      executionConfig.setNumberOfExecutionRetries(1);
+
+      val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, 
receiver)
 
       val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "2 s")
       cluster.start()
@@ -170,7 +177,10 @@ class RecoveryITCase(_system: ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val executionConfig = new ExecutionConfig()
+      executionConfig.setNumberOfExecutionRetries(1);
+
+      val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, 
receiver)
 
       val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "2 s")
       cluster.start()

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index a6d60dd..4d320ea 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, 
JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{Sender, 
AgnosticBinaryReceiver, Receiver}
@@ -65,7 +66,7 @@ class SlotSharingITCase(_system: ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -109,7 +110,8 @@ class SlotSharingITCase(_system: ActorSystem)
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
       receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver)
+      val jobGraph = new JobGraph("Bipartite job", new ExecutionConfig(),
+        sender1, sender2, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 49a1c95..c108596 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.{Kill, ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, 
JobGraph}
@@ -28,7 +29,6 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
 import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, 
JobSubmitSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.runtime.util.SerializedThrowable
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -67,7 +67,7 @@ class TaskManagerFailsWithSlotSharingITCase(_system: 
ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)
@@ -116,7 +116,7 @@ class TaskManagerFailsWithSlotSharingITCase(_system: 
ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index e3e1ac6..c339a07 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -24,7 +24,6 @@ import com.google.common.hash.Hashing;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -52,12 +51,10 @@ import 
org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
-import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -112,7 +109,7 @@ public class StreamingJobGraphGenerator {
        }
 
        public JobGraph createJobGraph() {
-               jobGraph = new JobGraph(streamGraph.getJobName());
+               jobGraph = new JobGraph(streamGraph.getJobName(), 
streamGraph.getExecutionConfig());
 
                // make sure that all vertices start immediately
                jobGraph.setScheduleMode(ScheduleMode.ALL);
@@ -133,12 +130,6 @@ public class StreamingJobGraphGenerator {
 
                configureRestartStrategy();
 
-               try {
-                       
InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), 
this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY);
-               } catch (IOException e) {
-                       throw new RuntimeException("Config object could not be 
written to Job Configuration: ", e);
-               }
-               
                return jobGraph;
        }
 
@@ -494,7 +485,8 @@ public class StreamingJobGraphGenerator {
        }
 
        private void configureRestartStrategy() {
-               
jobGraph.setRestartStrategyConfiguration(streamGraph.getExecutionConfig().getRestartStrategy());
+               jobGraph.getExecutionConfig().setRestartStrategy(
+                       streamGraph.getExecutionConfig().getRestartStrategy());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
index 9f75727..0517576 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -42,7 +42,7 @@ public class RestartStrategyTest {
                StreamGraph graph = env.getStreamGraph();
                JobGraph jobGraph = graph.getJobGraph();
 
-               RestartStrategies.RestartStrategyConfiguration restartStrategy 
= jobGraph.getRestartStrategyConfiguration();
+               RestartStrategies.RestartStrategyConfiguration restartStrategy 
= jobGraph.getExecutionConfig().getRestartStrategy();
 
                Assert.assertNotNull(restartStrategy);
                Assert.assertTrue(restartStrategy instanceof 
RestartStrategies.FixedDelayRestartStrategyConfiguration);
@@ -64,7 +64,7 @@ public class RestartStrategyTest {
                StreamGraph graph = env.getStreamGraph();
                JobGraph jobGraph = graph.getJobGraph();
 
-               RestartStrategies.RestartStrategyConfiguration restartStrategy 
= jobGraph.getRestartStrategyConfiguration();
+               RestartStrategies.RestartStrategyConfiguration restartStrategy 
= jobGraph.getExecutionConfig().getRestartStrategy();
 
                Assert.assertNotNull(restartStrategy);
                Assert.assertTrue(restartStrategy instanceof 
RestartStrategies.NoRestartStrategyConfiguration);
@@ -86,7 +86,7 @@ public class RestartStrategyTest {
                StreamGraph graph = env.getStreamGraph();
                JobGraph jobGraph = graph.getJobGraph();
 
-               RestartStrategies.RestartStrategyConfiguration restartStrategy 
= jobGraph.getRestartStrategyConfiguration();
+               RestartStrategies.RestartStrategyConfiguration restartStrategy 
= jobGraph.getExecutionConfig().getRestartStrategy();
 
                Assert.assertNotNull(restartStrategy);
                Assert.assertTrue(restartStrategy instanceof 
RestartStrategies.FixedDelayRestartStrategyConfiguration);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 8a814ff..e9aec48 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -77,10 +77,16 @@ public class StreamingJobGraphGeneratorTest {
                config.setParallelism(dop);
                
                JobGraph jobGraph = compiler.createJobGraph();
-               
+
+               final String EXEC_CONFIG_KEY = "runtime.config";
+
+               
InstantiationUtil.writeObjectToConfig(jobGraph.getExecutionConfig(),
+                       jobGraph.getJobConfiguration(),
+                       EXEC_CONFIG_KEY);
+
                ExecutionConfig executionConfig = 
InstantiationUtil.readObjectFromConfig(
                                jobGraph.getJobConfiguration(),
-                               ExecutionConfig.CONFIG_KEY,
+                               EXEC_CONFIG_KEY,
                                Thread.currentThread().getContextClassLoader());
                
                assertNotNull(executionConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 184f87e..732d3e5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -134,6 +135,7 @@ public class RescalePartitionerTest extends TestLogger {
                        jobId,
                        jobName,
                        cfg,
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy(),
                        new ArrayList<BlobKey>(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index c4b74e8..f91353e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -90,8 +91,10 @@ public class StreamMockEnvironment implements Environment {
 
        private final int bufferSize;
 
-       public StreamMockEnvironment(Configuration jobConfig, Configuration 
taskConfig, long memorySize,
-                                                                       
MockInputSplitProvider inputSplitProvider, int bufferSize) {
+       private final ExecutionConfig executionConfig;
+
+       public StreamMockEnvironment(Configuration jobConfig, Configuration 
taskConfig, ExecutionConfig executionConfig,
+                                                                       long 
memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
                this.taskInfo = new TaskInfo("", 0, 1, 0);
                this.jobConfiguration = jobConfig;
                this.taskConfiguration = taskConfig;
@@ -103,9 +106,15 @@ public class StreamMockEnvironment implements Environment {
                this.inputSplitProvider = inputSplitProvider;
                this.bufferSize = bufferSize;
 
+               this.executionConfig = executionConfig;
                this.accumulatorRegistry = new AccumulatorRegistry(jobID, 
getExecutionId());
        }
 
+       public StreamMockEnvironment(Configuration jobConfig, Configuration 
taskConfig, long memorySize,
+                                                                
MockInputSplitProvider inputSplitProvider, int bufferSize) {
+               this(jobConfig, taskConfig, null, memorySize, 
inputSplitProvider, bufferSize);
+       }
+
        public void addInputGate(InputGate gate) {
                inputs.add(gate);
        }
@@ -206,6 +215,11 @@ public class StreamMockEnvironment implements Environment {
        }
 
        @Override
+       public ExecutionConfig getExecutionConfig() {
+               return this.executionConfig;
+       }
+
+       @Override
        public JobID getJobID() {
                return this.jobID;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index 1830054..ed1dd60 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -77,6 +78,11 @@ public class StreamTaskAsyncCheckpointTest {
                        testHarness.bufferSize) {
 
                        @Override
+                       public ExecutionConfig getExecutionConfig() {
+                               return testHarness.executionConfig;
+                       }
+
+                       @Override
                        public void acknowledgeCheckpoint(long checkpointId) {
                                super.acknowledgeCheckpoint(checkpointId);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 7cc58b5..7f4492a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import akka.actor.ActorRef;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -134,7 +135,7 @@ public class StreamTaskTest {
 
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
                                new JobID(), new JobVertexID(), new 
ExecutionAttemptID(),
-                               "Test Task", 0, 1, 0,
+                               new ExecutionConfig(), "Test Task", 0, 1, 0,
                                new Configuration(),
                                taskConfig.getConfiguration(),
                                invokable.getName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 8dc7edd..e750f6f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
 
 import java.io.IOException;
@@ -104,12 +103,6 @@ public class StreamTaskTestHarness<OUT> {
                this.jobConfig = new Configuration();
                this.taskConfig = new Configuration();
                this.executionConfig = new ExecutionConfig();
-               
-               try {
-                       InstantiationUtil.writeObjectToConfig(executionConfig, 
this.jobConfig, ExecutionConfig.CONFIG_KEY);
-               } catch (IOException e) {
-                       throw new RuntimeException(e);
-               }
 
                streamConfig = new StreamConfig(taskConfig);
                streamConfig.setChainStart();
@@ -156,7 +149,8 @@ public class StreamTaskTestHarness<OUT> {
         * Task thread to finish running.
         */
        public void invoke() throws Exception {
-               mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, 
memorySize, new MockInputSplitProvider(), bufferSize);
+               mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, 
executionConfig,
+                       memorySize, new MockInputSplitProvider(), bufferSize);
                task.setEnvironment(mockEnv);
 
                initializeInputs();

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 28c2e58..f6c22d4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.failingPrograms;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -64,7 +65,7 @@ public class JobSubmissionFailsITCase {
                        
                        final JobVertex jobVertex = new JobVertex("Working job 
vertex.");
                        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-                       workingJobGraph = new JobGraph("Working testing job", 
jobVertex);
+                       workingJobGraph = new JobGraph("Working testing job", 
new ExecutionConfig(), jobVertex);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -115,7 +116,7 @@ public class JobSubmissionFailsITCase {
                        final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
                        
failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
 
-                       final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", failingJobVertex);
+                       final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", new ExecutionConfig(), failingJobVertex);
 
                        try {
                                submitJob(failingJobGraph);
@@ -140,7 +141,7 @@ public class JobSubmissionFailsITCase {
        @Test
        public void testSubmitEmptyJobGraph() {
                try {
-                       final JobGraph jobGraph = new JobGraph("Testing job");
+                       final JobGraph jobGraph = new JobGraph("Testing job", 
new ExecutionConfig());
        
                        try {
                                submitJob(jobGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
index e5a494b..c1bd5e2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
@@ -68,6 +68,39 @@ public class MapITCase extends MultipleProgramsTestBase {
                compareResultAsText(result, expected);
        }
 
+       @Test
+       public void testRuntimeContextAndExecutionConfigParams() throws 
Exception {
+               /*
+                * Test identity map with basic type
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().setNumberOfExecutionRetries(1000);
+
+               DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+               DataSet<String> identityMapDs = ds.
+                       map(new RichMapFunction<String, String>() {
+                               @Override
+                               public String map(String value) throws 
Exception {
+                                       Assert.assertTrue(1000 == 
getRuntimeContext().getExecutionConfig().getNumberOfExecutionRetries());
+                                       return value;
+                               }
+                       });
+
+               List<String> result = identityMapDs.collect();
+
+               String expected = "Hi\n" +
+                       "Hello\n" +
+                       "Hello world\n" +
+                       "Hello world, how are you?\n" +
+                       "I am fine.\n" +
+                       "Luke Skywalker\n" +
+                       "Random comment\n" +
+                       "LOL\n";
+
+               compareResultAsText(result, expected);
+       }
+
        public static class Mapper1 implements MapFunction<String, String> {
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 5783fcc..e8ad527 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.recovery;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -366,7 +367,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                        // BLocking JobGraph
                        JobVertex blockingVertex = new JobVertex("Blocking 
vertex");
                        
blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
-                       JobGraph jobGraph = new JobGraph(blockingVertex);
+                       JobGraph jobGraph = new JobGraph(new ExecutionConfig(), 
blockingVertex);
 
                        // Submit the job in detached mode
                        leader.tell(new SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED));

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index 5080aa2..32423be 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -25,6 +25,7 @@ import akka.actor.UntypedActor;
 import akka.testkit.TestActorRef;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -428,7 +429,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
         * Creates a simple blocking JobGraph.
         */
        private static JobGraph createBlockingJobGraph() {
-               JobGraph jobGraph = new JobGraph("Blocking program");
+               JobGraph jobGraph = new JobGraph("Blocking program", new 
ExecutionConfig());
 
                JobVertex jobVertex = new JobVertex("Blocking Vertex");
                jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index aada364..a870578 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.runtime;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -96,7 +97,7 @@ public class NetworkStackThroughputITCase {
 
                private JobGraph createJobGraph(int dataVolumeGb, boolean 
useForwarder, boolean isSlowSender,
                                                                                
boolean isSlowReceiver, int numSubtasks) {
-                       JobGraph jobGraph = new JobGraph("Speed Test");
+                       JobGraph jobGraph = new JobGraph("Speed Test", new 
ExecutionConfig());
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
                        JobVertex producer = new JobVertex("Speed Test 
Producer");

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 45ee839..09f9cac 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.PoisonPill;
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -167,7 +168,7 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
                sender.setSlotSharingGroup(slotSharingGroup);
                receiver.setSlotSharingGroup(slotSharingGroup);
 
-               final JobGraph graph = new JobGraph("Blocking test job", 
sender, receiver);
+               final JobGraph graph = new JobGraph("Blocking test job", new 
ExecutionConfig(), sender, receiver);
 
                final ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 2586a27..48888a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 
 import org.apache.commons.io.FileUtils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -178,7 +179,7 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
                sender.setParallelism(2);
                sender.setInvokableClass(StoppableInvokable.class);
 
-               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", sender);
+               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", new ExecutionConfig(), sender);
                final JobID jid = jobGraph.getJobID();
 
                cluster.submitJobDetached(jobGraph);
@@ -206,7 +207,7 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
                sender.setParallelism(2);
                sender.setInvokableClass(StoppableInvokable.class);
 
-               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", sender);
+               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", new ExecutionConfig(), sender);
                final JobID jid = jobGraph.getJobID();
 
                cluster.submitJobDetached(jobGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index fddf639..2265b3b 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.scala.runtime.jobmanager
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, 
NoOpInvokable}
@@ -93,12 +94,12 @@ class JobManagerFailsITCase(_system: ActorSystem)
       val sender = new JobVertex("BlockingSender")
       sender.setParallelism(num_slots)
       sender.setInvokableClass(classOf[BlockingNoOpInvokable])
-      val jobGraph = new JobGraph("Blocking Testjob", sender)
+      val jobGraph = new JobGraph("Blocking Testjob", new ExecutionConfig(), 
sender)
 
       val noOp = new JobVertex("NoOpInvokable")
       noOp.setParallelism(num_slots)
       noOp.setInvokableClass(classOf[NoOpInvokable])
-      val jobGraph2 = new JobGraph("NoOp Testjob", noOp)
+      val jobGraph2 = new JobGraph("NoOp Testjob", new ExecutionConfig(), noOp)
 
       val cluster = startDeathwatchCluster(num_slots / 2, 2)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
index 09af430..9aa1e94 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
@@ -23,6 +23,7 @@ import java.util.UUID
 import akka.actor.ActorSystem
 import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
@@ -64,7 +65,7 @@ class JobManagerLeaderSessionIDITSuite(_system: ActorSystem)
     val sender = new JobVertex("BlockingSender");
     sender.setParallelism(numSlots)
     sender.setInvokableClass(classOf[BlockingUntilSignalNoOpInvokable])
-    val jobGraph = new JobGraph("TestJob", sender)
+    val jobGraph = new JobGraph("TestJob", new ExecutionConfig(), sender)
 
     val oldSessionID = UUID.randomUUID()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 869af82..88d760d 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.api.scala.runtime.taskmanager
 
-import akka.actor.Status.{Failure, Success}
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.configuration.Configuration
@@ -33,7 +33,6 @@ import 
org.apache.flink.runtime.messages.TaskManagerMessages.{RegisteredAtJobMan
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.runtime.util.SerializedThrowable
 import org.apache.flink.test.util.ForkableFlinkMiniCluster
 
 import org.junit.runner.RunWith
@@ -100,7 +99,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       receiver.setParallelism(num_tasks)
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
@@ -152,7 +151,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       receiver.setParallelism(num_tasks)
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), 
sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
@@ -191,12 +190,12 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val sender = new JobVertex("BlockingSender")
       sender.setParallelism(num_slots)
       sender.setInvokableClass(classOf[BlockingNoOpInvokable])
-      val jobGraph = new JobGraph("Blocking Testjob", sender)
+      val jobGraph = new JobGraph("Blocking Testjob", new ExecutionConfig(), 
sender)
 
       val noOp = new JobVertex("NoOpInvokable")
       noOp.setParallelism(num_slots)
       noOp.setInvokableClass(classOf[NoOpInvokable])
-      val jobGraph2 = new JobGraph("NoOp Testjob", noOp)
+      val jobGraph2 = new JobGraph("NoOp Testjob", new ExecutionConfig(), noOp)
 
       val cluster = createDeathwatchCluster(num_slots/2, 2)
 

Reply via email to