[FLINK-5934] Set the Scheduler in the ExecutionGraph via its constructor Before the scheduler was set when calling ExecutionGraph.scheduleForExecution(). This has the disadvantage that the ExecutionGraph has not scheduler set if something else went wrong before the scheduleForExecution call. Consequently, the job will be stuck in a restart loop because the recovery will fail if there is no Scheduler set. In order to solve the problem, the Scheduler is not passed to the ExecutionGraph when it is created.
This closes #3440. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c22efce0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c22efce0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c22efce0 Branch: refs/heads/release-1.2 Commit: c22efce098c14e8f08bad1e0065dbd02df6e4dbb Parents: 53b4542 Author: Till Rohrmann <[email protected]> Authored: Tue Feb 28 15:20:47 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Mar 2 13:59:41 2017 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 65 +++++++++----------- .../executiongraph/ExecutionGraphBuilder.java | 3 + .../flink/runtime/jobmanager/JobManager.scala | 3 +- ...ExecutionGraphCheckpointCoordinatorTest.java | 2 + .../ArchivedExecutionGraphTest.java | 5 +- .../ExecutionGraphConstructionTest.java | 25 +++++--- .../ExecutionGraphDeploymentTest.java | 47 +++++++------- .../ExecutionGraphMetricsTest.java | 3 +- .../ExecutionGraphRestartTest.java | 39 +++++++----- .../ExecutionGraphSignalsTest.java | 4 +- .../executiongraph/ExecutionGraphTestUtils.java | 5 +- .../ExecutionStateProgressTest.java | 4 +- .../executiongraph/LegacyJobVertexIdTest.java | 18 +++--- .../executiongraph/PointwisePatternTest.java | 22 ++++--- .../TerminalStateDeadlockTest.java | 5 +- .../executiongraph/VertexSlotSharingTest.java | 4 +- .../TaskManagerLossFailsTasksTest.scala | 5 +- .../partitioner/RescalePartitionerTest.java | 2 + 18 files changed, 155 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 66aec8b..10b2c51 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -128,6 +128,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive */ private final SerializedValue<JobInformation> serializedJobInformation; + /** The executor which is used to execute futures. */ + private final Executor futureExecutor; + + /** The executor which is used to execute blocking io operations */ + private final Executor ioExecutor; + /** {@code true} if all source tasks are stoppable. */ private boolean isStoppable = true; @@ -159,6 +165,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive /** The timeout for all messages that require a response/acknowledgement */ private final Time timeout; + /** Strategy to use for restarts */ + private final RestartStrategy restartStrategy; + + /** The slot provider to use for allocating slots for tasks as they are needed */ + private final SlotProvider slotProvider; + + /** The classloader for the user code. Needed for calls into user code classes */ + private final ClassLoader userClassLoader; + + /** Registered KvState instances reported by the TaskManagers. */ + private final KvStateLocationRegistry kvStateLocationRegistry; + // ------ Configuration of the Execution ------- /** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able @@ -184,15 +202,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive // ------ Fields that are relevant to the execution and need to be cleared before archiving ------- - /** The slot provider to use for allocating slots for tasks as they are needed */ - private SlotProvider slotProvider; - - /** Strategy to use for restarts */ - private RestartStrategy restartStrategy; - - /** The classloader for the user code. Needed for calls into user code classes */ - private ClassLoader userClassLoader; - /** The coordinator for checkpoints, if snapshot checkpoints are enabled */ private CheckpointCoordinator checkpointCoordinator; @@ -200,15 +209,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive * available after archiving. */ private CheckpointStatsTracker checkpointStatsTracker; - /** The executor which is used to execute futures. */ - private final Executor futureExecutor; - - /** The executor which is used to execute blocking io operations */ - private final Executor ioExecutor; - - /** Registered KvState instances reported by the TaskManagers. */ - private KvStateLocationRegistry kvStateLocationRegistry; - // ------ Fields that are only relevant for archived execution graphs ------------ private String jsonPlan; @@ -227,7 +227,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive Configuration jobConfig, SerializedValue<ExecutionConfig> serializedConfig, Time timeout, - RestartStrategy restartStrategy) throws IOException { + RestartStrategy restartStrategy, + SlotProvider slotProvider) throws IOException { this( futureExecutor, ioExecutor, @@ -239,6 +240,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive restartStrategy, new ArrayList<BlobKey>(), new ArrayList<URL>(), + slotProvider, ExecutionGraph.class.getClassLoader(), new UnregisteredMetricsGroup() ); @@ -255,6 +257,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive RestartStrategy restartStrategy, List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths, + SlotProvider slotProvider, ClassLoader userClassLoader, MetricGroup metricGroup) throws IOException { @@ -262,7 +265,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive checkNotNull(jobId); checkNotNull(jobName); checkNotNull(jobConfig); - checkNotNull(userClassLoader); this.jobInformation = new JobInformation( jobId, @@ -278,12 +280,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive this.futureExecutor = Preconditions.checkNotNull(futureExecutor); this.ioExecutor = Preconditions.checkNotNull(ioExecutor); - this.userClassLoader = userClassLoader; + this.slotProvider = Preconditions.checkNotNull(slotProvider, "scheduler"); + this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader"); - this.tasks = new ConcurrentHashMap<JobVertexID, ExecutionJobVertex>(); - this.intermediateResults = new ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>(); - this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>(); - this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>(); + this.tasks = new ConcurrentHashMap<>(16); + this.intermediateResults = new ConcurrentHashMap<>(16); + this.verticesInCreationOrder = new ArrayList<>(16); + this.currentExecutions = new ConcurrentHashMap<>(16); this.jobStatusListeners = new CopyOnWriteArrayList<>(); this.executionListeners = new CopyOnWriteArrayList<>(); @@ -700,17 +703,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } } - public void scheduleForExecution(SlotProvider slotProvider) throws JobException { - if (slotProvider == null) { - throw new IllegalArgumentException("Scheduler must not be null."); - } - - if (this.slotProvider != null && this.slotProvider != slotProvider) { - throw new IllegalArgumentException("Cannot use different slot providers for the same job"); - } + public void scheduleForExecution() throws JobException { if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { - this.slotProvider = slotProvider; switch (scheduleMode) { @@ -914,7 +909,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } } - scheduleForExecution(slotProvider); + scheduleForExecution(); } catch (Throwable t) { LOG.warn("Failed to restart the job.", t); http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 386f202..4545777 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -62,6 +63,7 @@ public class ExecutionGraphBuilder { Configuration jobManagerConfig, Executor futureExecutor, Executor ioExecutor, + SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, Time timeout, @@ -92,6 +94,7 @@ public class ExecutionGraphBuilder { restartStrategy, jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths(), + slotProvider, classLoader, metrics); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 3025727..00128a0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1287,6 +1287,7 @@ class JobManager( flinkConfiguration, futureExecutor, ioExecutor, + scheduler, userCodeLoader, checkpointRecoveryFactory, Time.of(timeout.length, timeout.unit), @@ -1405,7 +1406,7 @@ class JobManager( // the job. log.info(s"Scheduling job $jobId ($jobName).") - executionGraph.scheduleForExecution(scheduler) + executionGraph.scheduleForExecution() } else { // Remove the job graph. Otherwise it will be lingering around and possibly removed from // ZooKeeper by this JM. http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index bc95de7..99305e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; import org.junit.AfterClass; @@ -103,6 +104,7 @@ public class ExecutionGraphCheckpointCoordinatorTest { new NoRestartStrategy(), Collections.<BlobKey>emptyList(), Collections.<URL>emptyList(), + new Scheduler(TestingUtils.defaultExecutionContext()), ClassLoader.getSystemClassLoader(), new UnregisteredMetricsGroup()); http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 9b1064d..ae1a6d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -103,7 +104,9 @@ public class ArchivedExecutionGraphTest { new Configuration(), new SerializedValue<>(config), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + mock(SlotProvider.class)); + runtimeGraph.attachJobGraph(vertices); CheckpointStatsTracker statsTracker = new CheckpointStatsTracker( http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index bf3a17c..ea25dca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -119,7 +120,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -169,7 +171,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -244,7 +247,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -504,7 +508,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -569,7 +574,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); fail("Attached wrong jobgraph"); @@ -638,7 +644,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -685,7 +692,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); @@ -767,7 +775,8 @@ public class ExecutionGraphConstructionTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources()); http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index ef4f74c..bff4584 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -96,7 +96,8 @@ public class ExecutionGraphDeploymentTest { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4); @@ -311,6 +312,15 @@ public class ExecutionGraphDeploymentTest { v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); + for (int i = 0; i < dop1; i++) { + scheduler.newInstanceAvailable( + ExecutionGraphTestUtils.getInstance( + new ActorTaskManagerGateway( + new ExecutionGraphTestUtils.SimpleActorGateway( + TestingUtils.directExecutionContext())))); + } + // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( TestingUtils.directExecutionContext(), @@ -320,25 +330,18 @@ public class ExecutionGraphDeploymentTest { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + scheduler); eg.setQueuedSchedulingAllowed(false); List<JobVertex> ordered = Arrays.asList(v1, v2); eg.attachJobGraph(ordered); - Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); - for (int i = 0; i < dop1; i++) { - scheduler.newInstanceAvailable( - ExecutionGraphTestUtils.getInstance( - new ActorTaskManagerGateway( - new ExecutionGraphTestUtils.SimpleActorGateway( - TestingUtils.directExecutionContext())))); - } assertEquals(dop1, scheduler.getNumberOfAvailableSlots()); // schedule, this triggers mock deployment - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(); eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING)); @@ -356,6 +359,15 @@ public class ExecutionGraphDeploymentTest { v1.setInvokableClass(BatchTask.class); v2.setInvokableClass(BatchTask.class); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + for (int i = 0; i < dop1 + dop2; i++) { + scheduler.newInstanceAvailable( + ExecutionGraphTestUtils.getInstance( + new ActorTaskManagerGateway( + new ExecutionGraphTestUtils.SimpleActorGateway( + TestingUtils.directExecutionContext())))); + } + // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( TestingUtils.directExecutionContext(), @@ -365,25 +377,18 @@ public class ExecutionGraphDeploymentTest { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + scheduler); eg.setQueuedSchedulingAllowed(false); List<JobVertex> ordered = Arrays.asList(v1, v2); eg.attachJobGraph(ordered); - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - for (int i = 0; i < dop1 + dop2; i++) { - scheduler.newInstanceAvailable( - ExecutionGraphTestUtils.getInstance( - new ActorTaskManagerGateway( - new ExecutionGraphTestUtils.SimpleActorGateway( - TestingUtils.directExecutionContext())))); - } assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots()); // schedule, this triggers mock deployment - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions(); assertEquals(dop1 + dop2, executions.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index d8d8e24..02e2d38 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -156,6 +156,7 @@ public class ExecutionGraphMetricsTest extends TestLogger { testingRestartStrategy, Collections.<BlobKey>emptyList(), Collections.<URL>emptyList(), + scheduler, getClass().getClassLoader(), metricGroup); @@ -174,7 +175,7 @@ public class ExecutionGraphMetricsTest extends TestLogger { executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); // start execution - executionGraph.scheduleForExecution(scheduler); + executionGraph.scheduleForExecution(); assertTrue(0L == restartingTime.getValue()); http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 52bfc96..9ef72a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -115,12 +115,12 @@ public class ExecutionGraphRestartTest extends TestLogger { //initiate and schedule job JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2); - ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L)); + ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); //sanity checks @@ -239,7 +239,8 @@ public class ExecutionGraphRestartTest extends TestLogger { new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), // We want to manually control the restart and delay - new InfiniteDelayRestartStrategy()); + new InfiniteDelayRestartStrategy(), + scheduler); JobVertex jobVertex = new JobVertex("NoOpInvokable"); jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); @@ -251,7 +252,7 @@ public class ExecutionGraphRestartTest extends TestLogger { assertEquals(JobStatus.CREATED, executionGraph.getState()); - executionGraph.scheduleForExecution(scheduler); + executionGraph.scheduleForExecution(); assertEquals(JobStatus.RUNNING, executionGraph.getState()); @@ -383,12 +384,12 @@ public class ExecutionGraphRestartTest extends TestLogger { JobVertex sender = newJobVertex("Task1", 1, Tasks.NoOpInvokable.class); JobVertex receiver = newJobVertex("Task2", 1, Tasks.NoOpInvokable.class); JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver); - ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000)); + ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator(); @@ -452,13 +453,13 @@ public class ExecutionGraphRestartTest extends TestLogger { JobGraph jobGraph = new JobGraph("Test Job", vertex); jobGraph.setExecutionConfig(executionConfig); - ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy()); + ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); // Fail right after cancel (for example with concurrent slot release) @@ -498,13 +499,13 @@ public class ExecutionGraphRestartTest extends TestLogger { JobGraph jobGraph = new JobGraph("Test Job", vertex); jobGraph.setExecutionConfig(executionConfig); - ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy()); + ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); // Fail right after cancel (for example with concurrent slot release) @@ -554,13 +555,14 @@ public class ExecutionGraphRestartTest extends TestLogger { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - controllableRestartStrategy); + controllableRestartStrategy, + scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); @@ -659,7 +661,7 @@ public class ExecutionGraphRestartTest extends TestLogger { JobGraph jobGraph = new JobGraph("Pointwise job", sender); - ExecutionGraph eg = newExecutionGraph(restartStrategy); + ExecutionGraph eg = newExecutionGraph(restartStrategy, scheduler); if (isSpy) { eg = spy(eg); } @@ -667,7 +669,7 @@ public class ExecutionGraphRestartTest extends TestLogger { assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); return new Tuple2<>(eg, instance); } @@ -679,7 +681,7 @@ public class ExecutionGraphRestartTest extends TestLogger { return groupVertex; } - private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException { + private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, Scheduler scheduler) throws IOException { return new ExecutionGraph( TestingUtils.defaultExecutionContext(), TestingUtils.defaultExecutionContext(), @@ -688,7 +690,12 @@ public class ExecutionGraphRestartTest extends TestLogger { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - restartStrategy); + restartStrategy, + scheduler); + } + + private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException { + return newExecutionGraph(restartStrategy, new Scheduler(TestingUtils.defaultExecutionContext())); } private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index fde967e..fab6505 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -139,7 +140,8 @@ public class ExecutionGraphSignalsTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(ordered); f = eg.getClass().getDeclaredField("state"); http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 71ae3b6..ef4926e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -46,6 +46,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions; import org.apache.flink.runtime.messages.TaskMessages.CancelTask; @@ -55,6 +56,7 @@ import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import scala.concurrent.ExecutionContext; +import scala.concurrent.ExecutionContext$; public class ExecutionGraphTestUtils { @@ -180,7 +182,8 @@ public class ExecutionGraphTestUtils { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor))); ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout())); http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 19e2d6d..e458d02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -59,7 +60,8 @@ public class ExecutionStateProgressTest { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); graph.attachJobGraph(Collections.singletonList(ajv)); setGraphStatus(graph, JobStatus.RUNNING); http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java index 44dc0a4..a4da0bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -49,13 +50,14 @@ public class LegacyJobVertexIdTest { ExecutionGraph executionGraph = new ExecutionGraph( mock(Executor.class), - mock(Executor.class), - new JobID(), - "test", - mock(Configuration.class), - mock(SerializedValue.class), - Time.seconds(1), - mock(RestartStrategy.class)); + mock(Executor.class), + new JobID(), + "test", + mock(Configuration.class), + mock(SerializedValue.class), + Time.seconds(1), + mock(RestartStrategy.class), + mock(SlotProvider.class)); ExecutionJobVertex executionJobVertex = new ExecutionJobVertex(executionGraph, jobVertex, 1, Time.seconds(1)); @@ -74,4 +76,4 @@ public class LegacyJobVertexIdTest { Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId1)); Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId2)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index 0e147e3..b59dcda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -73,7 +74,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -119,7 +121,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -166,7 +169,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -214,7 +218,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -260,7 +265,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -326,7 +332,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -383,7 +390,8 @@ public class PointwisePatternTest { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index 5b1a03e..32ae093 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -187,11 +187,12 @@ public class TerminalStateDeadlockTest { EMPTY_CONFIG, new SerializedValue<>(new ExecutionConfig()), TIMEOUT, - new FixedDelayRestartStrategy(1, 0)); + new FixedDelayRestartStrategy(1, 0), + new Scheduler(TestingUtils.defaultExecutionContext())); } @Override - public void scheduleForExecution(SlotProvider slotProvider) { + public void scheduleForExecution() { // notify that we are done with the "restarting" synchronized (this) { done = true; http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index 27708a2..5c85592 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -87,7 +88,8 @@ public class VertexSlotSharingTest { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(vertices); // verify that the vertices are all in the same slot sharing group http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index 1cbd605..5e4a802 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -64,13 +64,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers { new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout, - new NoRestartStrategy()) + new NoRestartStrategy(), + scheduler) eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) eg.getState should equal(JobStatus.CREATED) - eg.scheduleForExecution(scheduler) + eg.scheduleForExecution() eg.getState should equal(JobStatus.RUNNING) instance1.markDead() http://git-wip-us.apache.org/repos/asf/flink/blob/c22efce0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java index e77cbb3..2e05101 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.streaming.api.datastream.DataStream; @@ -148,6 +149,7 @@ public class RescalePartitionerTest extends TestLogger { new NoRestartStrategy(), new ArrayList<BlobKey>(), new ArrayList<URL>(), + new Scheduler(TestingUtils.defaultExecutionContext()), ExecutionGraph.class.getClassLoader(), new UnregisteredMetricsGroup()); try {
