http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 88f9ce0..4cba4e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -98,6 +98,7 @@ public class ArchivedExecutionGraphTest { runtimeGraph = new ExecutionGraph( TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index 6f6fcd0..bf3a17c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -112,7 +112,8 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -161,7 +162,8 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -235,7 +237,8 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -494,7 +497,8 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -558,7 +562,8 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -626,7 +631,8 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -672,7 +678,8 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -753,7 +760,8 @@ public class ExecutionGraphConstructionTest { JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index d4acd8c..ef4f74c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -89,7 +89,8 @@ public class ExecutionGraphDeploymentTest { v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, "some job", new Configuration(), @@ -313,6 +314,7 @@ public class ExecutionGraphDeploymentTest { // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( TestingUtils.directExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, "failing test job", new Configuration(), @@ -356,7 +358,8 @@ public class ExecutionGraphDeploymentTest { // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( - TestingUtils.directExecutionContext(), + TestingUtils.directExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, "some job", new Configuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 09e7c3e..d8d8e24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -147,6 +147,7 @@ public class ExecutionGraphMetricsTest extends TestLogger { ExecutionGraph executionGraph = new ExecutionGraph( executor, + executor, jobGraph.getJobID(), jobGraph.getName(), jobConfig, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index d6770a6..52bfc96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -232,6 +232,7 @@ public class ExecutionGraphRestartTest extends TestLogger { // Blocking program ExecutionGraph executionGraph = new ExecutionGraph( TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), new JobID(), "TestJob", new Configuration(), @@ -547,6 +548,7 @@ public class ExecutionGraphRestartTest extends TestLogger { ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), @@ -679,13 +681,14 @@ public class ExecutionGraphRestartTest extends TestLogger { private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException { return new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - new JobID(), - "Test job", - new Configuration(), - new SerializedValue<>(new ExecutionConfig()), - AkkaUtils.getDefaultTimeout(), - restartStrategy); + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + new JobID(), + "Test job", + new Configuration(), + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + restartStrategy); } private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index de4a026..fde967e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -133,6 +133,7 @@ public class ExecutionGraphSignalsTest { eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 28dff02..71ae3b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -174,6 +174,7 @@ public class ExecutionGraphTestUtils { ExecutionGraph graph = new ExecutionGraph( executor, + executor, new JobID(), "test job", new Configuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 8434ed7..19e2d6d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -52,7 +52,8 @@ public class ExecutionStateProgressTest { ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); ExecutionGraph graph = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jid, "test job", new Configuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index 7a28b4a..0e147e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -66,7 +66,8 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -111,7 +112,8 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -157,7 +159,8 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -204,7 +207,8 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -249,7 +253,8 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -314,7 +319,8 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, @@ -370,7 +376,8 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index 4459970..5b1a03e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -181,6 +181,7 @@ public class TerminalStateDeadlockTest { TestExecGraph(JobID jobId) throws IOException { super( TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, "test graph", EMPTY_CONFIG, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index 0c95695..27708a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -80,13 +80,14 @@ public class VertexSlotSharingTest { List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - new JobID(), - "test job", - new Configuration(), - new SerializedValue<>(new ExecutionConfig()), - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), + new JobID(), + "test job", + new Configuration(), + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); eg.attachJobGraph(vertices); // verify that the vertices are all in the same slot sharing group http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java index c0d59fe..4beedb0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java @@ -39,7 +39,7 @@ public class FixedDelayRestartStrategyTest { restartDelay); ExecutionGraph executionGraph = mock(ExecutionGraph.class); - when(executionGraph.getExecutor()) + when(executionGraph.getFutureExecutor()) .thenReturn(ExecutionContext$.MODULE$.fromExecutor(MoreExecutors.directExecutor())); while(fixedDelayRestartStrategy.canRestart()) { http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 7d9c521..69aac31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -90,6 +90,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; @@ -135,6 +136,8 @@ public class JobManagerHARecoveryTest { flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots); + ExecutorService executor = null; + try { Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); @@ -152,21 +155,24 @@ public class JobManagerHARecoveryTest { MemoryArchivist.class, 10), "archive"); + executor = new ForkJoinPool(); + Props jobManagerProps = Props.create( - TestingJobManager.class, - flinkConfiguration, - new ForkJoinPool(), - instanceManager, - scheduler, - new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000), - archive, - new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), - timeout, - myLeaderElectionService, - mySubmittedJobGraphStore, - checkpointStateFactory, - jobRecoveryTimeout, - Option.apply(null)); + TestingJobManager.class, + flinkConfiguration, + executor, + executor, + instanceManager, + scheduler, + new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000), + archive, + new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), + timeout, + myLeaderElectionService, + mySubmittedJobGraphStore, + checkpointStateFactory, + jobRecoveryTimeout, + Option.apply(null)); jobManager = system.actorOf(jobManagerProps, "jobmanager"); ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID); @@ -282,6 +288,10 @@ public class JobManagerHARecoveryTest { if (taskManager != null) { taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); } + + if (executor != null) { + executor.shutdownNow(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index 082b1de..f051281 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -183,20 +183,21 @@ public class JobManagerLeaderElectionTest extends TestLogger { CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory(); return Props.create( - TestingJobManager.class, - configuration, - executor, - new InstanceManager(), - new Scheduler(TestingUtils.defaultExecutionContext()), - new BlobLibraryCacheManager(new BlobServer(configuration), 10L), - ActorRef.noSender(), - new NoRestartStrategy.NoRestartStrategyFactory(), - AkkaUtils.getDefaultTimeoutAsFiniteDuration(), - leaderElectionService, - submittedJobGraphStore, - checkpointRecoveryFactory, - AkkaUtils.getDefaultTimeoutAsFiniteDuration(), - Option.apply(null) + TestingJobManager.class, + configuration, + executor, + executor, + new InstanceManager(), + new Scheduler(TestingUtils.defaultExecutionContext()), + new BlobLibraryCacheManager(new BlobServer(configuration), 10L), + ActorRef.noSender(), + new NoRestartStrategy.NoRestartStrategyFactory(), + AkkaUtils.getDefaultTimeoutAsFiniteDuration(), + leaderElectionService, + submittedJobGraphStore, + checkpointRecoveryFactory, + AkkaUtils.getDefaultTimeoutAsFiniteDuration(), + Option.apply(null) ); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index 68cb668..1cbd605 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -58,6 +58,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers { val eg = new ExecutionGraph( TestingUtils.defaultExecutionContext, + TestingUtils.defaultExecutionContext, new JobID(), "test job", new Configuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 50a5559..269a66f 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.testingUtils -import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} +import java.util.concurrent.{Executor, ExecutorService, TimeUnit, TimeoutException} import akka.actor.{ActorRef, ActorSystem, Props} import akka.pattern.Patterns._ @@ -80,7 +80,8 @@ class TestingCluster( override def getJobManagerProps( jobManagerClass: Class[_ <: JobManager], configuration: Configuration, - executorService: ExecutorService, + futureExecutor: Executor, + ioExecutor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -96,7 +97,8 @@ class TestingCluster( val props = super.getJobManagerProps( jobManagerClass, configuration, - executorService, + futureExecutor, + ioExecutor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index c6fd923..39c7a53 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -39,7 +39,8 @@ import scala.language.postfixOps */ class TestingJobManager( flinkConfiguration: Configuration, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -53,7 +54,8 @@ class TestingJobManager( metricRegistry : Option[MetricRegistry]) extends JobManager( flinkConfiguration, - executor, + futureExecutor, + ioExecutor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java index 37ea68a..e77cbb3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -139,6 +139,7 @@ public class RescalePartitionerTest extends TestLogger { ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index aabc19d..5244124 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -40,8 +40,9 @@ import scala.concurrent.duration.FiniteDuration * instead of an anonymous class with the respective mixin to obtain a more readable logger name. * * @param flinkConfiguration Configuration object for the actor - * @param executor Execution context which is used to execute concurrent tasks in the + * @param futureExecutor Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param ioExecutor for blocking io operations * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] * @param scheduler Scheduler to schedule Flink jobs @@ -53,7 +54,8 @@ import scala.concurrent.duration.FiniteDuration */ class TestingYarnJobManager( flinkConfiguration: Configuration, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -67,7 +69,8 @@ class TestingYarnJobManager( metricRegistry : Option[MetricRegistry]) extends YarnJobManager( flinkConfiguration, - executor, + futureExecutor, + ioExecutor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index da5959b..8f2cc33 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Some; import scala.concurrent.duration.FiniteDuration; import java.io.File; @@ -339,8 +341,8 @@ public class YarnApplicationMasterRunner { actorSystem, futureExecutor, ioExecutor, - new scala.Some<>(JobManager.JOB_MANAGER_NAME()), - scala.Option.<String>empty(), + new Some<>(JobManager.JOB_MANAGER_NAME()), + Option.<String>empty(), getJobManagerClass(), getArchivistClass())._1(); @@ -379,7 +381,6 @@ public class YarnApplicationMasterRunner { ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps); - // 4: Process reapers // The process reapers ensure that upon unexpected actor death, the process exits // and does not stay lingering around unresponsive @@ -414,6 +415,9 @@ public class YarnApplicationMasterRunner { } } + futureExecutor.shutdownNow(); + ioExecutor.shutdownNow(); + return INIT_ERROR_EXIT_CODE; } @@ -432,8 +436,11 @@ public class YarnApplicationMasterRunner { } } - futureExecutor.shutdownNow(); - ioExecutor.shutdownNow(); + org.apache.flink.runtime.concurrent.Executors.gracefulShutdown( + AkkaUtils.getTimeout(config).toMillis(), + TimeUnit.MILLISECONDS, + futureExecutor, + ioExecutor); return 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index a81e6cf..db4eea8 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -40,8 +40,9 @@ import scala.language.postfixOps * to start/administer/stop the Yarn session. * * @param flinkConfiguration Configuration object for the actor - * @param executor Execution context which is used to execute concurrent tasks in the + * @param futureExecutor Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param ioExecutor for blocking io operations * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] * @param scheduler Scheduler to schedule Flink jobs @@ -53,7 +54,8 @@ import scala.language.postfixOps */ class YarnJobManager( flinkConfiguration: FlinkConfiguration, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -67,7 +69,8 @@ class YarnJobManager( metricsRegistry: Option[MetricRegistry]) extends ContaineredJobManager( flinkConfiguration, - executor, + futureExecutor, + ioExecutor, instanceManager, scheduler, libraryCacheManager,
