[hotfix] [tests] Simplify JobMasterTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e85bb07 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e85bb07 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e85bb07 Branch: refs/heads/master Commit: 9e85bb0780978e08f00da9c7bc4529b4dd2abafd Parents: d8a8866 Author: Till Rohrmann <[email protected]> Authored: Tue Feb 13 15:16:27 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sun Feb 18 10:12:55 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMasterTest.java | 227 ++++++++++--------- 1 file changed, 126 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9e85bb07/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index e8f0bc4..e142d9c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -21,13 +21,12 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; @@ -39,6 +38,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; @@ -48,10 +48,16 @@ import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.TestLogger; import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; -import java.net.URL; +import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -63,23 +69,85 @@ import static org.hamcrest.MatcherAssert.assertThat; @Category(Flip6.class) public class JobMasterTest extends TestLogger { - private final Time testingTimeout = Time.seconds(10L); + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - @Test - public void testHeartbeatTimeoutWithTaskManager() throws Exception { - final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService( + private static final Time testingTimeout = Time.seconds(10L); + + private static final long heartbeatInterval = 1L; + + private static final long heartbeatTimeout = 5L; + + private static final JobGraph jobGraph = new JobGraph(); + + private static TestingRpcService rpcService; + + private static HeartbeatServices fastHeartbeatServices; + + private BlobServer blobServer; + + private Configuration configuration; + + private ResourceID jmResourceId; + + private JobMasterId jobMasterId; + + private TestingHighAvailabilityServices haServices; + + private TestingLeaderRetrievalService rmLeaderRetrievalService; + + private TestingFatalErrorHandler testingFatalErrorHandler; + + @BeforeClass + public static void setupClass() { + rpcService = new TestingRpcService(); + + fastHeartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, rpcService.getScheduledExecutor()); + } + + @Before + public void setup() throws IOException { + configuration = new Configuration(); + haServices = new TestingHighAvailabilityServices(); + jobMasterId = JobMasterId.generate(); + jmResourceId = ResourceID.generate(); + + testingFatalErrorHandler = new TestingFatalErrorHandler(); + + haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + + rmLeaderRetrievalService = new TestingLeaderRetrievalService( null, null); haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); - haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); - final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); - final String jobManagerAddress = "jm"; - final JobMasterId jobMasterId = JobMasterId.generate(); - final ResourceID jmResourceId = new ResourceID(jobManagerAddress); + configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + blobServer = new BlobServer(configuration, new VoidBlobStore()); + + blobServer.start(); + } + + @After + public void teardown() throws Exception { + if (testingFatalErrorHandler != null) { + testingFatalErrorHandler.rethrowError(); + } - final String taskManagerAddress = "tm"; + if (blobServer != null) { + blobServer.close(); + } + } + + @AfterClass + public static void teardownClass() { + if (rpcService != null) { + rpcService.stopService(); + rpcService = null; + } + } + + @Test + public void testHeartbeatTimeoutWithTaskManager() throws Exception { final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway(); @@ -89,51 +157,40 @@ public class JobMasterTest extends TestLogger { taskExecutorGateway.setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete); taskExecutorGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0)); - final TestingRpcService rpc = new TestingRpcService(); - rpc.registerGateway(taskManagerAddress, taskExecutorGateway); - - final long heartbeatInterval = 1L; - final long heartbeatTimeout = 5L; - - final ScheduledExecutor scheduledExecutor = rpc.getScheduledExecutor(); - final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); - - final JobGraph jobGraph = new JobGraph(); - - Configuration configuration = new Configuration(); + rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); - try (BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore())) { - blobServer.start(); - - final JobMaster jobMaster = new JobMaster( - rpc, - jobMasterConfiguration, - jmResourceId, - jobGraph, - haServices, - jobManagerSharedServices, - heartbeatServices, - blobServer, - null, - new NoOpOnCompletionActions(), - testingFatalErrorHandler, - FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()), - null, - null); - - CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); + final JobMaster jobMaster = new JobMaster( + rpcService, + jobMasterConfiguration, + jmResourceId, + jobGraph, + haServices, + jobManagerSharedServices, + fastHeartbeatServices, + blobServer, + null, + new NoOpOnCompletionActions(), + testingFatalErrorHandler, + JobMasterTest.class.getClassLoader(), + null, + null); + + CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); + try { // wait for the start to complete startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); // register task manager will trigger monitor heartbeat target, schedule heartbeat request at interval time - CompletableFuture<RegistrationResponse> registrationResponse = jobMasterGateway - .registerTaskManager(taskManagerAddress, taskManagerLocation, testingTimeout); + CompletableFuture<RegistrationResponse> registrationResponse = jobMasterGateway.registerTaskManager( + taskExecutorGateway.getAddress(), + taskManagerLocation, + testingTimeout); // wait for the completion of the registration registrationResponse.get(); @@ -145,39 +202,17 @@ public class JobMasterTest extends TestLogger { final JobID disconnectedJobManager = disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID())); - - // check if a concurrent error occurred - testingFatalErrorHandler.rethrowError(); - } finally { jobManagerSharedServices.shutdown(); - rpc.stopService(); + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @Test public void testHeartbeatTimeoutWithResourceManager() throws Exception { final String resourceManagerAddress = "rm"; - final String jobManagerAddress = "jm"; final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final JobMasterId jobMasterId = JobMasterId.generate(); final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); - final ResourceID jmResourceId = new ResourceID(jobManagerAddress); - final JobGraph jobGraph = new JobGraph(); - - final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService( - null, - null); - haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); - haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); - - final TestingRpcService rpc = new TestingRpcService(); - - final long heartbeatInterval = 1L; - final long heartbeatTimeout = 5L; - final ScheduledExecutor scheduledExecutor = rpc.getScheduledExecutor(); - final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway( resourceManagerId, @@ -197,36 +232,30 @@ public class JobMasterTest extends TestLogger { resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0)); - rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); - - final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); - - Configuration configuration = new Configuration(); + rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); - try (BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore())) { - blobServer.start(); - - final JobMaster jobMaster = new JobMaster( - rpc, - jobMasterConfiguration, - jmResourceId, - jobGraph, - haServices, - jobManagerSharedServices, - heartbeatServices, - blobServer, - null, - new NoOpOnCompletionActions(), - testingFatalErrorHandler, - FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()), - null, - null); - - CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); + final JobMaster jobMaster = new JobMaster( + rpcService, + jobMasterConfiguration, + jmResourceId, + jobGraph, + haServices, + jobManagerSharedServices, + fastHeartbeatServices, + blobServer, + null, + new NoOpOnCompletionActions(), + testingFatalErrorHandler, + JobMasterTest.class.getClassLoader(), + null, + null); + + CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); + try { // wait for the start operation to complete startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -246,13 +275,9 @@ public class JobMasterTest extends TestLogger { // heartbeat timeout should trigger disconnect JobManager from ResourceManager assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID())); - - // check if a concurrent error occurred - testingFatalErrorHandler.rethrowError(); - } finally { jobManagerSharedServices.shutdown(); - rpc.stopService(); + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } }
