Repository: flink Updated Branches: refs/heads/master 099fdfa0c -> 96b353d98
http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 3785fc7..5c25003 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -126,7 +126,7 @@ public class JobManagerTest { sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block sender.createAndAddResultDataSet(rid, PIPELINED); - final JobGraph jobGraph = new JobGraph("Blocking test job", new ExecutionConfig(), sender); + final JobGraph jobGraph = new JobGraph("Blocking test job", sender); final JobID jid = jobGraph.getJobID(); final ActorGateway jobManagerGateway = cluster.getLeaderGateway( @@ -253,7 +253,7 @@ public class JobManagerTest { sender.setParallelism(2); sender.setInvokableClass(StoppableInvokable.class); - final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender); + final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); final JobID jid = jobGraph.getJobID(); final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); @@ -305,7 +305,7 @@ public class JobManagerTest { sender.setParallelism(1); sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block - final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", new ExecutionConfig(), sender); + final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", sender); final JobID jid = jobGraph.getJobID(); final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index e820ed6..959b9a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -109,7 +109,7 @@ public class JobSubmitTest { // create a simple job graph JobVertex jobVertex = new JobVertex("Test Vertex"); jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); - JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex); + JobGraph jg = new JobGraph("test job", jobVertex); // request the blob port from the job manager Future<Object> future = jmGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout); @@ -173,7 +173,7 @@ public class JobSubmitTest { }; jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); - JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex); + JobGraph jg = new JobGraph("test job", jobVertex); // submit the job Future<Object> submitFuture = jmGateway.ask( http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index dfb0b91..561bda3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -104,7 +104,7 @@ public class SlotCountExceedingParallelismTest { DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); - final JobGraph jobGraph = new JobGraph(jobName, new ExecutionConfig(), sender, receiver); + final JobGraph jobGraph = new JobGraph(jobName, sender, receiver); // We need to allow queued scheduling, because there are not enough slots available // to run all tasks at once. We queue tasks and then let them finish/consume the blocking http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java index ca2ecf5..8ebb7f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java @@ -38,7 +38,7 @@ public class StandaloneSubmittedJobGraphStoreTest { StandaloneSubmittedJobGraphStore jobGraphs = new StandaloneSubmittedJobGraphStore(); SubmittedJobGraph jobGraph = new SubmittedJobGraph( - new JobGraph("testNoOps", new ExecutionConfig()), + new JobGraph("testNoOps"), new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE)); assertEquals(0, jobGraphs.recoverJobGraphs().size()); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java index 5e53596..c71bd35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java @@ -261,7 +261,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { // --------------------------------------------------------------------------------------------- private SubmittedJobGraph createSubmittedJobGraph(JobID jobId, long start) { - final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph", new ExecutionConfig()); + final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph"); final JobVertex jobVertex = new JobVertex("Test JobVertex"); jobVertex.setParallelism(1); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java index 07fc2c5..b03c38b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java @@ -113,7 +113,6 @@ public class ScheduleOrUpdateConsumersTest { final JobGraph jobGraph = new JobGraph( "Mixed pipelined and blocking result", - new ExecutionConfig(), sender, pipelinedReceiver, blockingReceiver); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java index a2cefb6..ccd2156 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java @@ -140,7 +140,10 @@ public class LeaderChangeJobRecoveryTest extends TestLogger { ExecutionConfig executionConfig = new ExecutionConfig(); - return new JobGraph("Blocking test job", executionConfig, sender, receiver); + JobGraph jobGraph = new JobGraph("Blocking test job", sender, receiver); + jobGraph.setExecutionConfig(executionConfig); + + return jobGraph; } public static class TestActorGateway implements ActorGateway { http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java index 6d938ac..2eacdee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java @@ -269,6 +269,6 @@ public class LeaderChangeStateCleanupTest extends TestLogger { sender.setSlotSharingGroup(slotSharingGroup); receiver.setSlotSharingGroup(slotSharingGroup); - return new JobGraph("Blocking test job", new ExecutionConfig(), sender, receiver); + return new JobGraph("Blocking test job", sender, receiver); } } http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 9d33920..1b463bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.taskmanager; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; @@ -149,7 +149,8 @@ public class TaskAsyncCallTest { TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( new JobID(), new JobVertexID(), new ExecutionAttemptID(), - new ExecutionConfig(), "Test Task", 0, 1, 0, + ExecutionConfigTest.getSerializedConfig(), + "Test Task", 0, 1, 0, new Configuration(), new Configuration(), CheckpointsInOrderInvokable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java index f0e72d7..a093233 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -97,7 +97,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger { producer.setSlotSharingGroup(slot); consumer.setSlotSharingGroup(slot); - JobGraph jobGraph = new JobGraph(new ExecutionConfig(), producer, consumer); + JobGraph jobGraph = new JobGraph(producer, consumer); // Submit job and wait until running ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft()); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java index e7f4c5c..09dd817 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java @@ -74,7 +74,7 @@ public class TaskCancelTest { flink.start(); // Setup - final JobGraph jobGraph = new JobGraph("Cancel Big Union", new ExecutionConfig()); + final JobGraph jobGraph = new JobGraph("Cancel Big Union"); JobVertex[] sources = new JobVertex[numberOfSources]; SlotSharingGroup group = new SlotSharingGroup(); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 9e7abb6..3ee9a84 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -24,6 +24,7 @@ import akka.actor.Props; import akka.japi.Creator; import akka.testkit.JavaTestKit; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -66,6 +67,7 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.StoppableInvokable; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -158,7 +160,7 @@ public class TaskManagerTest extends TestLogger { final JobID jid = new JobID(); final JobVertexID vid = new JobVertexID(); final ExecutionAttemptID eid = new ExecutionAttemptID(); - final ExecutionConfig executionConfig = new ExecutionConfig(); + final SerializedValue<ExecutionConfig> executionConfig = ExecutionConfigTest.getSerializedConfig(); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, executionConfig, "TestTask", 2, 7, 0, new Configuration(), new Configuration(), @@ -262,14 +264,14 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid2 = new ExecutionAttemptID(); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, - new ExecutionConfig(), "TestTask1", 1, 5, 0, + ExecutionConfigTest.getSerializedConfig(), "TestTask1", 1, 5, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, - new ExecutionConfig(), "TestTask2", 2, 7, 0, + ExecutionConfigTest.getSerializedConfig(), "TestTask2", 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), @@ -395,13 +397,15 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid1 = new ExecutionAttemptID(); final ExecutionAttemptID eid2 = new ExecutionAttemptID(); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, new ExecutionConfig(), + final SerializedValue<ExecutionConfig> executionConfig = ExecutionConfigTest.getSerializedConfig(); + + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, executionConfig, "TestTask1", 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, new ExecutionConfig(), + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, executionConfig, "TestTask2", 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), @@ -522,14 +526,14 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid2 = new ExecutionAttemptID(); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, - new ExecutionConfig(), "Sender", 0, 1, 0, + ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, - new ExecutionConfig(), "Receiver", 2, 7, 0, + ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), @@ -623,13 +627,13 @@ public class TaskManagerTest extends TestLogger { ); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, - new ExecutionConfig(), "Sender", 0, 1, 0, + ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, - new ExecutionConfig(), "Receiver", 2, 7, 0, + ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.singletonList(ircdd), @@ -764,13 +768,13 @@ public class TaskManagerTest extends TestLogger { ); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, - new ExecutionConfig(), "Sender", 0, 1, 0, + ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, - new ExecutionConfig(), "Receiver", 2, 7, 0, + ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.singletonList(ircdd), @@ -909,7 +913,7 @@ public class TaskManagerTest extends TestLogger { final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( jid, vid, eid, - new ExecutionConfig(), "Receiver", 0, 1, 0, + ExecutionConfigTest.getSerializedConfig(), "Receiver", 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), @@ -1003,7 +1007,7 @@ public class TaskManagerTest extends TestLogger { new InputGateDeploymentDescriptor(resultId, 0, icdd); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - jid, vid, eid, new ExecutionConfig(), "Receiver", 0, 1, 0, + jid, vid, eid, ExecutionConfigTest.getSerializedConfig(), "Receiver", 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), @@ -1079,7 +1083,7 @@ public class TaskManagerTest extends TestLogger { new JobID(), new JobVertexID(), new ExecutionAttemptID(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), "Task", 0, 1, http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index aa37d47..99e037d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.util.SerializedValue; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -62,7 +63,7 @@ public class TaskStopTest { when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class)); when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class)); when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class)); - when(tddMock.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); + when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class)); when(tddMock.getInvokableClassName()).thenReturn("className"); task = new Task(tddMock, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 034681e..06f393f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager; import com.google.common.collect.Maps; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -628,7 +628,8 @@ public class TaskTest { private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) { return new TaskDeploymentDescriptor( new JobID(), new JobVertexID(), new ExecutionAttemptID(), - new ExecutionConfig(), "Test Task", 0, 1, 0, + ExecutionConfigTest.getSerializedConfig(), + "Test Task", 0, 1, 0, new Configuration(), new Configuration(), invokable.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index ef3dae4..1927c39 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -18,12 +18,12 @@ package org.apache.flink.runtime.executiongraph -import org.apache.flink.api.common.{ExecutionConfig, JobID} +import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest, JobID} import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy -import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex} +import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.testingUtils.TestingUtils @@ -50,14 +50,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers { sender.setInvokableClass(classOf[Tasks.NoOpInvokable]) sender.setParallelism(20) - val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender) + val jobGraph = new JobGraph("Pointwise job", sender) val eg = new ExecutionGraph( TestingUtils.defaultExecutionContext, new JobID(), "test job", new Configuration(), - new ExecutionConfig, + ExecutionConfigTest.getSerializedConfig, AkkaUtils.getDefaultTimeout, new NoRestartStrategy()) http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala index f52d37e..12e2d63 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala @@ -68,7 +68,7 @@ class CoLocationConstraintITCase(_system: ActorSystem) receiver.setStrictlyCoLocatedWith(sender) - val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise job", sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 894ba38..2b5b29f 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -68,7 +68,7 @@ class JobManagerITCase(_system: ActorSystem) vertex.setParallelism(2) vertex.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex) + val jobGraph = new JobGraph("Test Job", vertex) val cluster = TestingUtils.startTestingCluster(1) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -110,7 +110,7 @@ class JobManagerITCase(_system: ActorSystem) vertex.setParallelism(num_tasks) vertex.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex) + val jobGraph = new JobGraph("Test Job", vertex) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -145,7 +145,7 @@ class JobManagerITCase(_system: ActorSystem) vertex.setParallelism(num_tasks) vertex.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph = new JobGraph("Test job", new ExecutionConfig(), vertex) + val jobGraph = new JobGraph("Test job", vertex) jobGraph.setAllowQueuedScheduling(true) val cluster = TestingUtils.startTestingCluster(10) @@ -181,7 +181,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val cluster = TestingUtils.startTestingCluster(2 * num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -216,7 +216,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Bipartite Job", sender, receiver) val cluster = TestingUtils.startTestingCluster(2 * num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -253,8 +253,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE) receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL) - val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(), - sender1, receiver, sender2) + val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2) val cluster = TestingUtils.startTestingCluster(6 * num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -299,8 +298,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE) receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL) - val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(), - sender1, receiver, sender2) + val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2) val cluster = TestingUtils.startTestingCluster(6 * num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -342,8 +340,7 @@ class JobManagerITCase(_system: ActorSystem) forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL) receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL) - val jobGraph = new JobGraph("Forwarding Job", new ExecutionConfig(), - sender, forwarder, receiver) + val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver) jobGraph.setScheduleMode(ScheduleMode.ALL) @@ -379,7 +376,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -427,7 +424,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -472,7 +469,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise job", sender, receiver) val cluster = TestingUtils.startTestingCluster(2 * num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -512,7 +509,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise job", sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -560,7 +557,7 @@ class JobManagerITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise job", sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -603,8 +600,7 @@ class JobManagerITCase(_system: ActorSystem) source.setParallelism(num_tasks) sink.setParallelism(num_tasks) - val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", - new ExecutionConfig(), source, sink) + val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source, sink) val cluster = TestingUtils.startTestingCluster(2*num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -630,12 +626,12 @@ class JobManagerITCase(_system: ActorSystem) val vertex = new JobVertex("Test Vertex") vertex.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph1 = new JobGraph("Test Job", new ExecutionConfig(), vertex) + val jobGraph1 = new JobGraph("Test Job", vertex) val slowVertex = new WaitingOnFinalizeJobVertex("Long running Vertex", 2000) slowVertex.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph2 = new JobGraph("Long running Job", new ExecutionConfig(), slowVertex) + val jobGraph2 = new JobGraph("Long running Job", slowVertex) val cluster = TestingUtils.startTestingCluster(1) val jm = cluster.getLeaderGateway(1 seconds) @@ -684,7 +680,7 @@ class JobManagerITCase(_system: ActorSystem) vertex.setParallelism(1) vertex.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex) + val jobGraph = new JobGraph("Test Job", vertex) val cluster = TestingUtils.startTestingCluster(1) val jm = cluster.getLeaderGateway(1 seconds) @@ -782,7 +778,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex) + val jobGraph = new JobGraph(jobVertex) // Submit job w/o checkpointing configured jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor) @@ -815,7 +811,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex) + val jobGraph = new JobGraph(jobVertex) jobGraph.setSnapshotSettings(new JobSnapshottingSettings( java.util.Collections.emptyList(), java.util.Collections.emptyList(), @@ -868,7 +864,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex) + val jobGraph = new JobGraph(jobVertex) jobGraph.setSnapshotSettings(new JobSnapshottingSettings( java.util.Collections.emptyList(), java.util.Collections.emptyList(), @@ -926,7 +922,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex) + val jobGraph = new JobGraph(jobVertex) jobGraph.setSnapshotSettings(new JobSnapshottingSettings( java.util.Collections.emptyList(), java.util.Collections.emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index ea42cd1..b96369f 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -86,7 +86,8 @@ class RecoveryITCase(_system: ActorSystem) val executionConfig = new ExecutionConfig() executionConfig.setNumberOfExecutionRetries(1); - val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, receiver) + val jobGraph = new JobGraph("Pointwise job", sender, receiver) + jobGraph.setExecutionConfig(executionConfig) val cluster = createTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "2 s") cluster.start() @@ -133,7 +134,8 @@ class RecoveryITCase(_system: ActorSystem) val executionConfig = new ExecutionConfig() executionConfig.setNumberOfExecutionRetries(1); - val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, receiver) + val jobGraph = new JobGraph("Pointwise job", sender, receiver) + jobGraph.setExecutionConfig(executionConfig) val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "2 s") cluster.start() @@ -180,7 +182,8 @@ class RecoveryITCase(_system: ActorSystem) val executionConfig = new ExecutionConfig() executionConfig.setNumberOfExecutionRetries(1); - val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, receiver) + val jobGraph = new JobGraph("Pointwise job", sender, receiver) + jobGraph.setExecutionConfig(executionConfig) val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "2 s") cluster.start() http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala index 4d320ea..f986e73 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala @@ -66,7 +66,7 @@ class SlotSharingITCase(_system: ActorSystem) sender.setSlotSharingGroup(sharingGroup) receiver.setSlotSharingGroup(sharingGroup) - val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) @@ -110,8 +110,7 @@ class SlotSharingITCase(_system: ActorSystem) receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE) receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL) - val jobGraph = new JobGraph("Bipartite job", new ExecutionConfig(), - sender1, sender2, receiver) + val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver) val cluster = TestingUtils.startTestingCluster(num_tasks) val jmGateway = cluster.getLeaderGateway(1 seconds) http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala index 100fe66..d0136f0 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala @@ -67,7 +67,7 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem) sender.setSlotSharingGroup(sharingGroup) receiver.setSlotSharingGroup(sharingGroup) - val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val jobID = jobGraph.getJobID val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) @@ -116,7 +116,7 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem) sender.setSlotSharingGroup(sharingGroup) receiver.setSlotSharingGroup(sharingGroup) - val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val jobID = jobGraph.getJobID val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index c3b515e..9adabae 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -55,7 +55,6 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayDeque; import java.util.ArrayList; @@ -110,7 +109,8 @@ public class StreamingJobGraphGenerator { } public JobGraph createJobGraph() { - jobGraph = new JobGraph(streamGraph.getJobName(), streamGraph.getExecutionConfig()); + + jobGraph = new JobGraph(streamGraph.getJobName()); // make sure that all vertices start immediately jobGraph.setScheduleMode(ScheduleMode.ALL); @@ -126,15 +126,11 @@ public class StreamingJobGraphGenerator { setPhysicalEdges(); setSlotSharing(); - + configureCheckpointing(); - try { - // make sure that we can send the ExecutionConfig without user code object problems - jobGraph.getExecutionConfig().serializeUserCode(); - } catch (IOException e) { - throw new IllegalStateException("Could not serialize ExecutionConfig.", e); - } + // set the ExecutionConfig last when it has been finalized + jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); return jobGraph; } http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java index 0517576..f768ace 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -42,7 +43,8 @@ public class RestartStrategyTest { StreamGraph graph = env.getStreamGraph(); JobGraph jobGraph = graph.getJobGraph(); - RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getExecutionConfig().getRestartStrategy(); + RestartStrategies.RestartStrategyConfiguration restartStrategy = + ExecutionConfigTest.deserializeConfig(jobGraph.getSerializedExecutionConfig()).getRestartStrategy(); Assert.assertNotNull(restartStrategy); Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration); @@ -64,7 +66,8 @@ public class RestartStrategyTest { StreamGraph graph = env.getStreamGraph(); JobGraph jobGraph = graph.getJobGraph(); - RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getExecutionConfig().getRestartStrategy(); + RestartStrategies.RestartStrategyConfiguration restartStrategy = + ExecutionConfigTest.deserializeConfig(jobGraph.getSerializedExecutionConfig()).getRestartStrategy(); Assert.assertNotNull(restartStrategy); Assert.assertTrue(restartStrategy instanceof RestartStrategies.NoRestartStrategyConfiguration); @@ -86,7 +89,8 @@ public class RestartStrategyTest { StreamGraph graph = env.getStreamGraph(); JobGraph jobGraph = graph.getJobGraph(); - RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getExecutionConfig().getRestartStrategy(); + RestartStrategies.RestartStrategyConfiguration restartStrategy = + ExecutionConfigTest.deserializeConfig(jobGraph.getSerializedExecutionConfig()).getRestartStrategy(); Assert.assertNotNull(restartStrategy); Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index e9aec48..0de4325 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Random; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -29,6 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.SerializedValue; import org.junit.Test; import static org.junit.Assert.*; @@ -80,17 +82,19 @@ public class StreamingJobGraphGeneratorTest { final String EXEC_CONFIG_KEY = "runtime.config"; - InstantiationUtil.writeObjectToConfig(jobGraph.getExecutionConfig(), + InstantiationUtil.writeObjectToConfig(jobGraph.getSerializedExecutionConfig(), jobGraph.getJobConfiguration(), EXEC_CONFIG_KEY); - ExecutionConfig executionConfig = InstantiationUtil.readObjectFromConfig( + SerializedValue<ExecutionConfig> serializedExecutionConfig = InstantiationUtil.readObjectFromConfig( jobGraph.getJobConfiguration(), EXEC_CONFIG_KEY, Thread.currentThread().getContextClassLoader()); - - assertNotNull(executionConfig); - + + assertNotNull(serializedExecutionConfig); + + ExecutionConfig executionConfig = ExecutionConfigTest.deserializeConfig(serializedExecutionConfig); + assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled()); assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled()); assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled()); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java index 732d3e5..8484e90 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.runtime.partitioner; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple; @@ -135,7 +135,7 @@ public class RescalePartitionerTest extends TestLogger { jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), new ArrayList<BlobKey>(), http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 7f4492a..ed8bf01 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import akka.actor.ActorRef; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; @@ -135,7 +135,8 @@ public class StreamTaskTest { TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( new JobID(), new JobVertexID(), new ExecutionAttemptID(), - new ExecutionConfig(), "Test Task", 0, 1, 0, + ExecutionConfigTest.getSerializedConfig(), + "Test Task", 0, 1, 0, new Configuration(), taskConfig.getConfiguration(), invokable.getName(), http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java index f6c22d4..28c2e58 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.failingPrograms; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -65,7 +64,7 @@ public class JobSubmissionFailsITCase { final JobVertex jobVertex = new JobVertex("Working job vertex."); jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); - workingJobGraph = new JobGraph("Working testing job", new ExecutionConfig(), jobVertex); + workingJobGraph = new JobGraph("Working testing job", jobVertex); } catch (Exception e) { e.printStackTrace(); @@ -116,7 +115,7 @@ public class JobSubmissionFailsITCase { final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class); - final JobGraph failingJobGraph = new JobGraph("Failing testing job", new ExecutionConfig(), failingJobVertex); + final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); try { submitJob(failingJobGraph); @@ -141,7 +140,7 @@ public class JobSubmissionFailsITCase { @Test public void testSubmitEmptyJobGraph() { try { - final JobGraph jobGraph = new JobGraph("Testing job", new ExecutionConfig()); + final JobGraph jobGraph = new JobGraph("Testing job"); try { submitJob(jobGraph); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index f4d88a8..b9284dc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -21,7 +21,6 @@ package org.apache.flink.test.recovery; import akka.actor.ActorRef; import akka.actor.ActorSystem; import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -368,7 +367,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { // BLocking JobGraph JobVertex blockingVertex = new JobVertex("Blocking vertex"); blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class); - JobGraph jobGraph = new JobGraph(new ExecutionConfig(), blockingVertex); + JobGraph jobGraph = new JobGraph(blockingVertex); // Submit the job in detached mode leader.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED)); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index 2418853..b4ffbd4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -440,7 +440,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { * Creates a simple blocking JobGraph. */ private static JobGraph createBlockingJobGraph() { - JobGraph jobGraph = new JobGraph("Blocking program", new ExecutionConfig()); + JobGraph jobGraph = new JobGraph("Blocking program"); JobVertex jobVertex = new JobVertex("Blocking Vertex"); jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index a870578..06df46f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -97,7 +97,7 @@ public class NetworkStackThroughputITCase { private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver, int numSubtasks) { - JobGraph jobGraph = new JobGraph("Speed Test", new ExecutionConfig()); + JobGraph jobGraph = new JobGraph("Speed Test"); SlotSharingGroup sharingGroup = new SlotSharingGroup(); JobVertex producer = new JobVertex("Speed Test Producer"); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 09f9cac..45ee839 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -22,7 +22,6 @@ import akka.actor.ActorSystem; import akka.actor.Kill; import akka.actor.PoisonPill; import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -168,7 +167,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { sender.setSlotSharingGroup(slotSharingGroup); receiver.setSlotSharingGroup(slotSharingGroup); - final JobGraph graph = new JobGraph("Blocking test job", new ExecutionConfig(), sender, receiver); + final JobGraph graph = new JobGraph("Blocking test job", sender, receiver); final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index e7b37d7..9cc90a1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -213,7 +212,7 @@ public class WebFrontendITCase { sender.setParallelism(2); sender.setInvokableClass(StoppableInvokable.class); - final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender); + final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); final JobID jid = jobGraph.getJobID(); cluster.submitJobDetached(jobGraph); @@ -251,7 +250,7 @@ public class WebFrontendITCase { sender.setParallelism(2); sender.setInvokableClass(StoppableInvokable.class); - final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender); + final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); final JobID jid = jobGraph.getJobID(); cluster.submitJobDetached(jobGraph); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index 2265b3b..ac661f3 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -21,7 +21,7 @@ package org.apache.flink.api.scala.runtime.jobmanager import akka.actor.{ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest} import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable} @@ -94,12 +94,12 @@ class JobManagerFailsITCase(_system: ActorSystem) val sender = new JobVertex("BlockingSender") sender.setParallelism(num_slots) sender.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph("Blocking Testjob", new ExecutionConfig(), sender) + val jobGraph = new JobGraph("Blocking Testjob", sender) val noOp = new JobVertex("NoOpInvokable") noOp.setParallelism(num_slots) noOp.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph2 = new JobGraph("NoOp Testjob", new ExecutionConfig(), noOp) + val jobGraph2 = new JobGraph("NoOp Testjob", noOp) val cluster = startDeathwatchCluster(num_slots / 2, 2) http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala index 9aa1e94..78bc0ee 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala @@ -65,7 +65,7 @@ class JobManagerLeaderSessionIDITSuite(_system: ActorSystem) val sender = new JobVertex("BlockingSender"); sender.setParallelism(numSlots) sender.setInvokableClass(classOf[BlockingUntilSignalNoOpInvokable]) - val jobGraph = new JobGraph("TestJob", new ExecutionConfig(), sender) + val jobGraph = new JobGraph("TestJob", sender) val oldSessionID = UUID.randomUUID() http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala index f1e115d..258f6df 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala @@ -20,21 +20,19 @@ package org.apache.flink.api.scala.runtime.taskmanager import akka.actor.{ActorSystem, Kill, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.api.common.ExecutionConfig - +import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest} import org.apache.flink.configuration.ConfigConstants import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils} +import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.client.JobExecutionException -import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph} -import org.apache.flink.runtime.jobmanager.Tasks.{NoOpInvokable, BlockingNoOpInvokable, BlockingReceiver, Sender} +import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} +import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, BlockingReceiver, NoOpInvokable, Sender} import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.messages.TaskManagerMessages.{RegisteredAtJobManager, NotifyWhenRegisteredAtJobManager} +import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} import org.apache.flink.test.util.ForkableFlinkMiniCluster - import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @@ -99,7 +97,7 @@ class TaskManagerFailsITCase(_system: ActorSystem) receiver.setParallelism(num_tasks) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val jobID = jobGraph.getJobID val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2) @@ -151,7 +149,7 @@ class TaskManagerFailsITCase(_system: ActorSystem) receiver.setParallelism(num_tasks) receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) - val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver) + val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val jobID = jobGraph.getJobID val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2) @@ -190,12 +188,12 @@ class TaskManagerFailsITCase(_system: ActorSystem) val sender = new JobVertex("BlockingSender") sender.setParallelism(num_slots) sender.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph("Blocking Testjob", new ExecutionConfig(), sender) + val jobGraph = new JobGraph("Blocking Testjob", sender) val noOp = new JobVertex("NoOpInvokable") noOp.setParallelism(num_slots) noOp.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph2 = new JobGraph("NoOp Testjob", new ExecutionConfig(), noOp) + val jobGraph2 = new JobGraph("NoOp Testjob", noOp) val cluster = createDeathwatchCluster(num_slots/2, 2)
