Repository: flink Updated Branches: refs/heads/master d0a390f9b -> 0f8d76c6f
http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index adaff29..1cd01ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -105,7 +106,7 @@ public class JobSubmitTest { // create a simple job graph JobVertex jobVertex = new JobVertex("Test Vertex"); jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); - JobGraph jg = new JobGraph("test job", jobVertex); + JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex); // request the blob port from the job manager Future<Object> future = jmGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout); @@ -169,7 +170,7 @@ public class JobSubmitTest { }; jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); - JobGraph jg = new JobGraph("test job", jobVertex); + JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex); // submit the job Future<Object> submitFuture = jmGateway.ask( http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index 1733406..dfb0b91 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmanager; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.io.network.api.reader.RecordReader; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; @@ -103,7 +104,7 @@ public class SlotCountExceedingParallelismTest { DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); - final JobGraph jobGraph = new JobGraph(jobName, sender, receiver); + final JobGraph jobGraph = new JobGraph(jobName, new ExecutionConfig(), sender, receiver); // We need to allow queued scheduling, because there are not enough slots available // to run all tasks at once. We queue tasks and then let them finish/consume the blocking http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java index 753e7be..ca2ecf5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager; import akka.actor.ActorRef; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -37,7 +38,7 @@ public class StandaloneSubmittedJobGraphStoreTest { StandaloneSubmittedJobGraphStore jobGraphs = new StandaloneSubmittedJobGraphStore(); SubmittedJobGraph jobGraph = new SubmittedJobGraph( - new JobGraph("testNoOps"), + new JobGraph("testNoOps", new ExecutionConfig()), new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE)); assertEquals(0, jobGraphs.recoverJobGraphs().size()); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java index 356ba36..5e53596 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager; import akka.actor.ActorRef; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -260,7 +261,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { // --------------------------------------------------------------------------------------------- private SubmittedJobGraph createSubmittedJobGraph(JobID jobId, long start) { - final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph"); + final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph", new ExecutionConfig()); final JobVertex jobVertex = new JobVertex("Test JobVertex"); jobVertex.setParallelism(1); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java index eb4d96f..07fc2c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler; import com.google.common.collect.Lists; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -112,6 +113,7 @@ public class ScheduleOrUpdateConsumersTest { final JobGraph jobGraph = new JobGraph( "Mixed pipelined and blocking result", + new ExecutionConfig(), sender, pipelinedReceiver, blockingReceiver); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java index c490a64..f14d62f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.instance.ActorGateway; @@ -268,6 +269,6 @@ public class LeaderChangeStateCleanupTest extends TestLogger { sender.setSlotSharingGroup(slotSharingGroup); receiver.setSlotSharingGroup(slotSharingGroup); - return new JobGraph("Blocking test job", sender, receiver); + return new JobGraph("Blocking test job", new ExecutionConfig(), sender, receiver); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 3fcc425..233dace 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.Future; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; @@ -44,12 +45,18 @@ public class DummyEnvironment implements Environment { private final TaskInfo taskInfo; private final JobID jobId = new JobID(); private final JobVertexID jobVertexId = new JobVertexID(); + private final ExecutionConfig executionConfig = new ExecutionConfig(); public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) { this.taskInfo = new TaskInfo(taskName, subTaskIndex, numSubTasks, 0); } @Override + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + @Override public JobID getJobID() { return jobId; } http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index fa97210..d29b206 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators.testutils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.UnmodifiableConfiguration; @@ -65,6 +66,8 @@ public class MockEnvironment implements Environment { private final TaskInfo taskInfo; + private final ExecutionConfig executionConfig; + private final MemoryManager memManager; private final IOManager ioManager; @@ -96,6 +99,7 @@ public class MockEnvironment implements Environment { this.memManager = new MemoryManager(memorySize, 1); this.ioManager = new IOManagerAsync(); + this.executionConfig = new ExecutionConfig(); this.inputSplitProvider = inputSplitProvider; this.bufferSize = bufferSize; @@ -186,6 +190,11 @@ public class MockEnvironment implements Environment { } @Override + public ExecutionConfig getExecutionConfig() { + return this.executionConfig; + } + + @Override public JobID getJobID() { return this.jobID; } http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index e5ff7b1..9d33920 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskmanager; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; @@ -148,7 +149,7 @@ public class TaskAsyncCallTest { TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( new JobID(), new JobVertexID(), new ExecutionAttemptID(), - "Test Task", 0, 1, 0, + new ExecutionConfig(), "Test Task", 0, 1, 0, new Configuration(), new Configuration(), CheckpointsInOrderInvokable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java index a3d1883..e7f4c5c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskmanager; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -73,7 +74,7 @@ public class TaskCancelTest { flink.start(); // Setup - final JobGraph jobGraph = new JobGraph("Cancel Big Union"); + final JobGraph jobGraph = new JobGraph("Cancel Big Union", new ExecutionConfig()); JobVertex[] sources = new JobVertex[numberOfSources]; SlotSharingGroup group = new SlotSharingGroup(); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index b80cb0e..56c2e43 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -23,6 +23,7 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -157,9 +158,11 @@ public class TaskManagerTest extends TestLogger { final JobID jid = new JobID(); final JobVertexID vid = new JobVertexID(); final ExecutionAttemptID eid = new ExecutionAttemptID(); + final ExecutionConfig executionConfig = new ExecutionConfig(); - final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7, 0, - new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(), + final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, executionConfig, + "TestTask", 2, 7, 0, new Configuration(), new Configuration(), + TestInvokableCorrect.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); @@ -258,13 +261,15 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid1 = new ExecutionAttemptID(); final ExecutionAttemptID eid2 = new ExecutionAttemptID(); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5, 0, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, + new ExecutionConfig(), "TestTask1", 1, 5, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7, 0, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, + new ExecutionConfig(), "TestTask2", 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), @@ -390,14 +395,14 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid1 = new ExecutionAttemptID(); final ExecutionAttemptID eid2 = new ExecutionAttemptID(); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5, 0, - new Configuration(), new Configuration(), StoppableInvokable.class.getName(), + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, new ExecutionConfig(), + "TestTask1", 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7, 0, - new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, new ExecutionConfig(), + "TestTask2", 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); @@ -516,13 +521,15 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid1 = new ExecutionAttemptID(); final ExecutionAttemptID eid2 = new ExecutionAttemptID(); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, + new ExecutionConfig(), "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, + new ExecutionConfig(), "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), @@ -615,12 +622,14 @@ public class TaskManagerTest extends TestLogger { } ); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, + new ExecutionConfig(), "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, + new ExecutionConfig(), "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.singletonList(ircdd), @@ -754,12 +763,14 @@ public class TaskManagerTest extends TestLogger { } ); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, + new ExecutionConfig(), "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, + new ExecutionConfig(), "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.singletonList(ircdd), @@ -897,7 +908,8 @@ public class TaskManagerTest extends TestLogger { new InputGateDeploymentDescriptor(resultId, 0, icdd); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - jid, vid, eid, "Receiver", 0, 1, 0, + jid, vid, eid, + new ExecutionConfig(), "Receiver", 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), @@ -991,7 +1003,7 @@ public class TaskManagerTest extends TestLogger { new InputGateDeploymentDescriptor(resultId, 0, icdd); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - jid, vid, eid, "Receiver", 0, 1, 0, + jid, vid, eid, new ExecutionConfig(), "Receiver", 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), @@ -1067,6 +1079,7 @@ public class TaskManagerTest extends TestLogger { new JobID(), new JobVertexID(), new ExecutionAttemptID(), + new ExecutionConfig(), "Task", 0, 1, http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index a60e074..aa37d47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -17,6 +17,7 @@ */ package org.apache.flink.runtime.taskmanager; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; @@ -61,6 +62,7 @@ public class TaskStopTest { when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class)); when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class)); when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class)); + when(tddMock.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); when(tddMock.getInvokableClassName()).thenReturn("className"); task = new Task(tddMock, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 45ca364..034681e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager; import com.google.common.collect.Maps; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -627,7 +628,7 @@ public class TaskTest { private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) { return new TaskDeploymentDescriptor( new JobID(), new JobVertexID(), new ExecutionAttemptID(), - "Test Task", 0, 1, 0, + new ExecutionConfig(), "Test Task", 0, 1, 0, new Configuration(), new Configuration(), invokable.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index dbd87d0..ef3dae4 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.executiongraph -import org.apache.flink.api.common.JobID +import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway @@ -50,13 +50,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers { sender.setInvokableClass(classOf[Tasks.NoOpInvokable]) sender.setParallelism(20) - val jobGraph = new JobGraph("Pointwise job", sender) + val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender) val eg = new ExecutionGraph( TestingUtils.defaultExecutionContext, new JobID(), "test job", new Configuration(), + new ExecutionConfig, AkkaUtils.getDefaultTimeout, new NoRestartStrategy()) http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala index a0c144a..f52d37e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorSystem import akka.actor.Status.Success import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender} @@ -67,7 +68,7 @@ class CoLocationConstraintITCase(_system: ActorSystem) receiver.setStrictlyCoLocatedWith(sender) - val jobGraph = new JobGraph("Pointwise job", sender, receiver) + val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index ec54b7e..894ba38 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -22,7 +22,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout -import org.apache.flink.api.common.JobID +import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, SavepointCoordinator} import org.apache.flink.runtime.client.JobExecutionException @@ -68,7 +68,7 @@ class JobManagerITCase(_system: ActorSystem) vertex.setParallelism(2) vertex.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph("Test Job", vertex) + val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex) val cluster = TestingUtils.startTestingCluster(1) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -110,7 +110,7 @@ class JobManagerITCase(_system: ActorSystem) vertex.setParallelism(num_tasks) vertex.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph = new JobGraph("Test Job", vertex) + val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -145,7 +145,7 @@ class JobManagerITCase(_system: ActorSystem) vertex.setParallelism(num_tasks) vertex.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph = new JobGraph("Test job", vertex) + val jobGraph = new JobGraph("Test job", new ExecutionConfig(), vertex) jobGraph.setAllowQueuedScheduling(true) val cluster = TestingUtils.startTestingCluster(10) @@ -181,7 +181,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) val cluster = TestingUtils.startTestingCluster(2 * num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -216,7 +216,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Bipartite Job", sender, receiver) + val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(), sender, receiver) val cluster = TestingUtils.startTestingCluster(2 * num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -253,7 +253,8 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE) receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL) - val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2) + val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(), + sender1, receiver, sender2) val cluster = TestingUtils.startTestingCluster(6 * num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -298,7 +299,8 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE) receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL) - val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2) + val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(), + sender1, receiver, sender2) val cluster = TestingUtils.startTestingCluster(6 * num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -340,7 +342,8 @@ class JobManagerITCase(_system: ActorSystem) forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL) receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL) - val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver) + val jobGraph = new JobGraph("Forwarding Job", new ExecutionConfig(), + sender, forwarder, receiver) jobGraph.setScheduleMode(ScheduleMode.ALL) @@ -376,7 +379,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -424,7 +427,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -469,7 +472,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise job", sender, receiver) + val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver) val cluster = TestingUtils.startTestingCluster(2 * num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -509,7 +512,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise job", sender, receiver) + val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -557,7 +560,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise job", sender, receiver) + val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -600,7 +603,8 @@ class JobManagerITCase(_system: ActorSystem) source.setParallelism(num_tasks) sink.setParallelism(num_tasks) - val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source, sink) + val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", + new ExecutionConfig(), source, sink) val cluster = TestingUtils.startTestingCluster(2*num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -626,12 +630,12 @@ class JobManagerITCase(_system: ActorSystem) val vertex = new JobVertex("Test Vertex") vertex.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph1 = new JobGraph("Test Job", vertex) + val jobGraph1 = new JobGraph("Test Job", new ExecutionConfig(), vertex) val slowVertex = new WaitingOnFinalizeJobVertex("Long running Vertex", 2000) slowVertex.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph2 = new JobGraph("Long running Job", slowVertex) + val jobGraph2 = new JobGraph("Long running Job", new ExecutionConfig(), slowVertex) val cluster = TestingUtils.startTestingCluster(1) val jm = cluster.getLeaderGateway(1 seconds) @@ -680,7 +684,7 @@ class JobManagerITCase(_system: ActorSystem) vertex.setParallelism(1) vertex.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph = new JobGraph("Test Job", vertex) + val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex) val cluster = TestingUtils.startTestingCluster(1) val jm = cluster.getLeaderGateway(1 seconds) @@ -778,7 +782,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph(jobVertex) + val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex) // Submit job w/o checkpointing configured jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor) @@ -811,7 +815,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph(jobVertex) + val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex) jobGraph.setSnapshotSettings(new JobSnapshottingSettings( java.util.Collections.emptyList(), java.util.Collections.emptyList(), @@ -864,7 +868,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph(jobVertex) + val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex) jobGraph.setSnapshotSettings(new JobSnapshottingSettings( java.util.Collections.emptyList(), java.util.Collections.emptyList(), @@ -922,7 +926,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph(jobVertex) + val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex) jobGraph.setSnapshotSettings(new JobSnapshottingSettings( java.util.Collections.emptyList(), java.util.Collections.emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index 41b6702..ea42cd1 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.{PoisonPill, ActorSystem} import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, JobVertex} @@ -82,7 +83,10 @@ class RecoveryITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise job", sender, receiver) + val executionConfig = new ExecutionConfig() + executionConfig.setNumberOfExecutionRetries(1); + + val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, receiver) val cluster = createTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "2 s") cluster.start() @@ -126,7 +130,10 @@ class RecoveryITCase(_system: ActorSystem) sender.setSlotSharingGroup(sharingGroup) receiver.setSlotSharingGroup(sharingGroup) - val jobGraph = new JobGraph("Pointwise job", sender, receiver) + val executionConfig = new ExecutionConfig() + executionConfig.setNumberOfExecutionRetries(1); + + val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, receiver) val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "2 s") cluster.start() @@ -170,7 +177,10 @@ class RecoveryITCase(_system: ActorSystem) sender.setSlotSharingGroup(sharingGroup) receiver.setSlotSharingGroup(sharingGroup) - val jobGraph = new JobGraph("Pointwise job", sender, receiver) + val executionConfig = new ExecutionConfig() + executionConfig.setNumberOfExecutionRetries(1); + + val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, receiver) val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "2 s") cluster.start() http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala index a6d60dd..4d320ea 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorSystem import akka.actor.Status.Success import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph} import org.apache.flink.runtime.jobmanager.Tasks.{Sender, AgnosticBinaryReceiver, Receiver} @@ -65,7 +66,7 @@ class SlotSharingITCase(_system: ActorSystem) sender.setSlotSharingGroup(sharingGroup) receiver.setSlotSharingGroup(sharingGroup) - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -109,7 +110,8 @@ class SlotSharingITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE) receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL) - val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver) + val jobGraph = new JobGraph("Bipartite job", new ExecutionConfig(), + sender1, sender2, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala index 49a1c95..c108596 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.{Kill, ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph} @@ -28,7 +29,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, JobSubmitSuccess, SubmitJob} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.apache.flink.runtime.util.SerializedThrowable import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @@ -67,7 +67,7 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem) sender.setSlotSharingGroup(sharingGroup) receiver.setSlotSharingGroup(sharingGroup) - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) val jobID = jobGraph.getJobID val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) @@ -116,7 +116,7 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem) sender.setSlotSharingGroup(sharingGroup) receiver.setSlotSharingGroup(sharingGroup) - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) val jobID = jobGraph.getJobID val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index e3e1ac6..c339a07 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -24,7 +24,6 @@ import com.google.common.hash.Hashing; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; @@ -52,12 +51,10 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; -import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayDeque; import java.util.ArrayList; @@ -112,7 +109,7 @@ public class StreamingJobGraphGenerator { } public JobGraph createJobGraph() { - jobGraph = new JobGraph(streamGraph.getJobName()); + jobGraph = new JobGraph(streamGraph.getJobName(), streamGraph.getExecutionConfig()); // make sure that all vertices start immediately jobGraph.setScheduleMode(ScheduleMode.ALL); @@ -133,12 +130,6 @@ public class StreamingJobGraphGenerator { configureRestartStrategy(); - try { - InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY); - } catch (IOException e) { - throw new RuntimeException("Config object could not be written to Job Configuration: ", e); - } - return jobGraph; } @@ -494,7 +485,8 @@ public class StreamingJobGraphGenerator { } private void configureRestartStrategy() { - jobGraph.setRestartStrategyConfiguration(streamGraph.getExecutionConfig().getRestartStrategy()); + jobGraph.getExecutionConfig().setRestartStrategy( + streamGraph.getExecutionConfig().getRestartStrategy()); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java index 9f75727..0517576 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java @@ -42,7 +42,7 @@ public class RestartStrategyTest { StreamGraph graph = env.getStreamGraph(); JobGraph jobGraph = graph.getJobGraph(); - RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration(); + RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getExecutionConfig().getRestartStrategy(); Assert.assertNotNull(restartStrategy); Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration); @@ -64,7 +64,7 @@ public class RestartStrategyTest { StreamGraph graph = env.getStreamGraph(); JobGraph jobGraph = graph.getJobGraph(); - RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration(); + RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getExecutionConfig().getRestartStrategy(); Assert.assertNotNull(restartStrategy); Assert.assertTrue(restartStrategy instanceof RestartStrategies.NoRestartStrategyConfiguration); @@ -86,7 +86,7 @@ public class RestartStrategyTest { StreamGraph graph = env.getStreamGraph(); JobGraph jobGraph = graph.getJobGraph(); - RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration(); + RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getExecutionConfig().getRestartStrategy(); Assert.assertNotNull(restartStrategy); Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 8a814ff..e9aec48 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -77,10 +77,16 @@ public class StreamingJobGraphGeneratorTest { config.setParallelism(dop); JobGraph jobGraph = compiler.createJobGraph(); - + + final String EXEC_CONFIG_KEY = "runtime.config"; + + InstantiationUtil.writeObjectToConfig(jobGraph.getExecutionConfig(), + jobGraph.getJobConfiguration(), + EXEC_CONFIG_KEY); + ExecutionConfig executionConfig = InstantiationUtil.readObjectFromConfig( jobGraph.getJobConfiguration(), - ExecutionConfig.CONFIG_KEY, + EXEC_CONFIG_KEY, Thread.currentThread().getContextClassLoader()); assertNotNull(executionConfig); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java index 184f87e..732d3e5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.runtime.partitioner; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple; @@ -134,6 +135,7 @@ public class RescalePartitionerTest extends TestLogger { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), new ArrayList<BlobKey>(), http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index c4b74e8..f91353e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -90,8 +91,10 @@ public class StreamMockEnvironment implements Environment { private final int bufferSize; - public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, - MockInputSplitProvider inputSplitProvider, int bufferSize) { + private final ExecutionConfig executionConfig; + + public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig, + long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { this.taskInfo = new TaskInfo("", 0, 1, 0); this.jobConfiguration = jobConfig; this.taskConfiguration = taskConfig; @@ -103,9 +106,15 @@ public class StreamMockEnvironment implements Environment { this.inputSplitProvider = inputSplitProvider; this.bufferSize = bufferSize; + this.executionConfig = executionConfig; this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId()); } + public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, + MockInputSplitProvider inputSplitProvider, int bufferSize) { + this(jobConfig, taskConfig, null, memorySize, inputSplitProvider, bufferSize); + } + public void addInputGate(InputGate gate) { inputs.add(gate); } @@ -206,6 +215,11 @@ public class StreamMockEnvironment implements Environment { } @Override + public ExecutionConfig getExecutionConfig() { + return this.executionConfig; + } + + @Override public JobID getJobID() { return this.jobID; } http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java index 1830054..ed1dd60 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; @@ -77,6 +78,11 @@ public class StreamTaskAsyncCheckpointTest { testHarness.bufferSize) { @Override + public ExecutionConfig getExecutionConfig() { + return testHarness.executionConfig; + } + + @Override public void acknowledgeCheckpoint(long checkpointId) { super.acknowledgeCheckpoint(checkpointId); } http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 7cc58b5..7f4492a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import akka.actor.ActorRef; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; @@ -134,7 +135,7 @@ public class StreamTaskTest { TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( new JobID(), new JobVertexID(), new ExecutionAttemptID(), - "Test Task", 0, 1, 0, + new ExecutionConfig(), "Test Task", 0, 1, 0, new Configuration(), taskConfig.getConfiguration(), invokable.getName(), http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 8dc7edd..e750f6f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -37,7 +37,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import org.apache.flink.util.InstantiationUtil; import org.junit.Assert; import java.io.IOException; @@ -104,12 +103,6 @@ public class StreamTaskTestHarness<OUT> { this.jobConfig = new Configuration(); this.taskConfig = new Configuration(); this.executionConfig = new ExecutionConfig(); - - try { - InstantiationUtil.writeObjectToConfig(executionConfig, this.jobConfig, ExecutionConfig.CONFIG_KEY); - } catch (IOException e) { - throw new RuntimeException(e); - } streamConfig = new StreamConfig(taskConfig); streamConfig.setChainStart(); @@ -156,7 +149,8 @@ public class StreamTaskTestHarness<OUT> { * Task thread to finish running. */ public void invoke() throws Exception { - mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, memorySize, new MockInputSplitProvider(), bufferSize); + mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, executionConfig, + memorySize, new MockInputSplitProvider(), bufferSize); task.setEnvironment(mockEnv); initializeInputs(); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java index 28c2e58..f6c22d4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.failingPrograms; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -64,7 +65,7 @@ public class JobSubmissionFailsITCase { final JobVertex jobVertex = new JobVertex("Working job vertex."); jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); - workingJobGraph = new JobGraph("Working testing job", jobVertex); + workingJobGraph = new JobGraph("Working testing job", new ExecutionConfig(), jobVertex); } catch (Exception e) { e.printStackTrace(); @@ -115,7 +116,7 @@ public class JobSubmissionFailsITCase { final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class); - final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); + final JobGraph failingJobGraph = new JobGraph("Failing testing job", new ExecutionConfig(), failingJobVertex); try { submitJob(failingJobGraph); @@ -140,7 +141,7 @@ public class JobSubmissionFailsITCase { @Test public void testSubmitEmptyJobGraph() { try { - final JobGraph jobGraph = new JobGraph("Testing job"); + final JobGraph jobGraph = new JobGraph("Testing job", new ExecutionConfig()); try { submitJob(jobGraph); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java index e5a494b..c1bd5e2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java @@ -68,6 +68,39 @@ public class MapITCase extends MultipleProgramsTestBase { compareResultAsText(result, expected); } + @Test + public void testRuntimeContextAndExecutionConfigParams() throws Exception { + /* + * Test identity map with basic type + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setNumberOfExecutionRetries(1000); + + DataSet<String> ds = CollectionDataSets.getStringDataSet(env); + DataSet<String> identityMapDs = ds. + map(new RichMapFunction<String, String>() { + @Override + public String map(String value) throws Exception { + Assert.assertTrue(1000 == getRuntimeContext().getExecutionConfig().getNumberOfExecutionRetries()); + return value; + } + }); + + List<String> result = identityMapDs.collect(); + + String expected = "Hi\n" + + "Hello\n" + + "Hello world\n" + + "Hello world, how are you?\n" + + "I am fine.\n" + + "Luke Skywalker\n" + + "Random comment\n" + + "LOL\n"; + + compareResultAsText(result, expected); + } + public static class Mapper1 implements MapFunction<String, String> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 5783fcc..e8ad527 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.test.recovery; import akka.actor.ActorRef; import akka.actor.ActorSystem; import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -366,7 +367,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { // BLocking JobGraph JobVertex blockingVertex = new JobVertex("Blocking vertex"); blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class); - JobGraph jobGraph = new JobGraph(blockingVertex); + JobGraph jobGraph = new JobGraph(new ExecutionConfig(), blockingVertex); // Submit the job in detached mode leader.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED)); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index 5080aa2..32423be 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -25,6 +25,7 @@ import akka.actor.UntypedActor; import akka.testkit.TestActorRef; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -428,7 +429,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { * Creates a simple blocking JobGraph. */ private static JobGraph createBlockingJobGraph() { - JobGraph jobGraph = new JobGraph("Blocking program"); + JobGraph jobGraph = new JobGraph("Blocking program", new ExecutionConfig()); JobVertex jobVertex = new JobVertex("Blocking Vertex"); jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index aada364..a870578 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.runtime; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; @@ -96,7 +97,7 @@ public class NetworkStackThroughputITCase { private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver, int numSubtasks) { - JobGraph jobGraph = new JobGraph("Speed Test"); + JobGraph jobGraph = new JobGraph("Speed Test", new ExecutionConfig()); SlotSharingGroup sharingGroup = new SlotSharingGroup(); JobVertex producer = new JobVertex("Speed Test Producer"); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 45ee839..09f9cac 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -22,6 +22,7 @@ import akka.actor.ActorSystem; import akka.actor.Kill; import akka.actor.PoisonPill; import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -167,7 +168,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { sender.setSlotSharingGroup(slotSharingGroup); receiver.setSlotSharingGroup(slotSharingGroup); - final JobGraph graph = new JobGraph("Blocking test job", sender, receiver); + final JobGraph graph = new JobGraph("Blocking test job", new ExecutionConfig(), sender, receiver); final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index 2586a27..48888a4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -178,7 +179,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { sender.setParallelism(2); sender.setInvokableClass(StoppableInvokable.class); - final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); + final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender); final JobID jid = jobGraph.getJobID(); cluster.submitJobDetached(jobGraph); @@ -206,7 +207,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { sender.setParallelism(2); sender.setInvokableClass(StoppableInvokable.class); - final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); + final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender); final JobID jid = jobGraph.getJobID(); cluster.submitJobDetached(jobGraph); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index fddf639..2265b3b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -21,6 +21,7 @@ package org.apache.flink.api.scala.runtime.jobmanager import akka.actor.{ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable} @@ -93,12 +94,12 @@ class JobManagerFailsITCase(_system: ActorSystem) val sender = new JobVertex("BlockingSender") sender.setParallelism(num_slots) sender.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph("Blocking Testjob", sender) + val jobGraph = new JobGraph("Blocking Testjob", new ExecutionConfig(), sender) val noOp = new JobVertex("NoOpInvokable") noOp.setParallelism(num_slots) noOp.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph2 = new JobGraph("NoOp Testjob", noOp) + val jobGraph2 = new JobGraph("NoOp Testjob", new ExecutionConfig(), noOp) val cluster = startDeathwatchCluster(num_slots / 2, 2) http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala index 09af430..9aa1e94 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala @@ -23,6 +23,7 @@ import java.util.UUID import akka.actor.ActorSystem import akka.actor.Status.Success import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils} import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex} @@ -64,7 +65,7 @@ class JobManagerLeaderSessionIDITSuite(_system: ActorSystem) val sender = new JobVertex("BlockingSender"); sender.setParallelism(numSlots) sender.setInvokableClass(classOf[BlockingUntilSignalNoOpInvokable]) - val jobGraph = new JobGraph("TestJob", sender) + val jobGraph = new JobGraph("TestJob", new ExecutionConfig(), sender) val oldSessionID = UUID.randomUUID() http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala index 869af82..88d760d 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala @@ -18,9 +18,9 @@ package org.apache.flink.api.scala.runtime.taskmanager -import akka.actor.Status.{Failure, Success} import akka.actor.{ActorSystem, Kill, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.configuration.ConfigConstants import org.apache.flink.configuration.Configuration @@ -33,7 +33,6 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.{RegisteredAtJobMan import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.apache.flink.runtime.util.SerializedThrowable import org.apache.flink.test.util.ForkableFlinkMiniCluster import org.junit.runner.RunWith @@ -100,7 +99,7 @@ class TaskManagerFailsITCase(_system: ActorSystem) receiver.setParallelism(num_tasks) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) val jobID = jobGraph.getJobID val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2) @@ -152,7 +151,7 @@ class TaskManagerFailsITCase(_system: ActorSystem) receiver.setParallelism(num_tasks) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) val jobID = jobGraph.getJobID val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2) @@ -191,12 +190,12 @@ class TaskManagerFailsITCase(_system: ActorSystem) val sender = new JobVertex("BlockingSender") sender.setParallelism(num_slots) sender.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph("Blocking Testjob", sender) + val jobGraph = new JobGraph("Blocking Testjob", new ExecutionConfig(), sender) val noOp = new JobVertex("NoOpInvokable") noOp.setParallelism(num_slots) noOp.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph2 = new JobGraph("NoOp Testjob", noOp) + val jobGraph2 = new JobGraph("NoOp Testjob", new ExecutionConfig(), noOp) val cluster = createDeathwatchCluster(num_slots/2, 2)
