[hotfix] [tests] Simplify JobMasterTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e85bb07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e85bb07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e85bb07

Branch: refs/heads/master
Commit: 9e85bb0780978e08f00da9c7bc4529b4dd2abafd
Parents: d8a8866
Author: Till Rohrmann <[email protected]>
Authored: Tue Feb 13 15:16:27 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sun Feb 18 10:12:55 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMasterTest.java  | 227 ++++++++++---------
 1 file changed, 126 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e85bb07/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index e8f0bc4..e142d9c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -21,13 +21,12 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -39,6 +38,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -48,10 +48,16 @@ import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
 
-import java.net.URL;
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -63,23 +69,85 @@ import static org.hamcrest.MatcherAssert.assertThat;
 @Category(Flip6.class)
 public class JobMasterTest extends TestLogger {
 
-       private final Time testingTimeout = Time.seconds(10L);
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-       @Test
-       public void testHeartbeatTimeoutWithTaskManager() throws Exception {
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+       private static final Time testingTimeout = Time.seconds(10L);
+
+       private static final long heartbeatInterval = 1L;
+
+       private static final long heartbeatTimeout = 5L;
+
+       private static final JobGraph jobGraph = new JobGraph();
+
+       private static TestingRpcService rpcService;
+
+       private static HeartbeatServices fastHeartbeatServices;
+
+       private BlobServer blobServer;
+
+       private Configuration configuration;
+
+       private ResourceID jmResourceId;
+
+       private JobMasterId jobMasterId;
+
+       private TestingHighAvailabilityServices haServices;
+
+       private TestingLeaderRetrievalService rmLeaderRetrievalService;
+
+       private TestingFatalErrorHandler testingFatalErrorHandler;
+
+       @BeforeClass
+       public static void setupClass() {
+               rpcService = new TestingRpcService();
+
+               fastHeartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
rpcService.getScheduledExecutor());
+       }
+
+       @Before
+       public void setup() throws IOException {
+               configuration = new Configuration();
+               haServices = new TestingHighAvailabilityServices();
+               jobMasterId = JobMasterId.generate();
+               jmResourceId = ResourceID.generate();
+
+               testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+               haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
+
+               rmLeaderRetrievalService = new TestingLeaderRetrievalService(
                        null,
                        null);
                
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
-               haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
-               final String jobManagerAddress = "jm";
-               final JobMasterId jobMasterId = JobMasterId.generate();
-               final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
+               configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+               blobServer = new BlobServer(configuration, new VoidBlobStore());
+
+               blobServer.start();
+       }
+
+       @After
+       public void teardown() throws Exception {
+               if (testingFatalErrorHandler != null) {
+                       testingFatalErrorHandler.rethrowError();
+               }
 
-               final String taskManagerAddress = "tm";
+               if (blobServer != null) {
+                       blobServer.close();
+               }
+       }
+
+       @AfterClass
+       public static void teardownClass() {
+               if (rpcService != null) {
+                       rpcService.stopService();
+                       rpcService = null;
+               }
+       }
+
+       @Test
+       public void testHeartbeatTimeoutWithTaskManager() throws Exception {
                final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
                final TestingTaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGateway();
 
@@ -89,51 +157,40 @@ public class JobMasterTest extends TestLogger {
                
taskExecutorGateway.setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete);
                taskExecutorGateway.setDisconnectJobManagerConsumer(tuple -> 
disconnectedJobManagerFuture.complete(tuple.f0));
 
-               final TestingRpcService rpc = new TestingRpcService();
-               rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
-
-               final long heartbeatInterval = 1L;
-               final long heartbeatTimeout = 5L;
-
-               final ScheduledExecutor scheduledExecutor = 
rpc.getScheduledExecutor();
-               final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
-
-               final JobGraph jobGraph = new JobGraph();
-
-               Configuration configuration = new Configuration();
+               rpcService.registerGateway(taskExecutorGateway.getAddress(), 
taskExecutorGateway);
 
                final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
                final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
 
-               try (BlobServer blobServer = new BlobServer(configuration, new 
VoidBlobStore())) {
-                       blobServer.start();
-
-                       final JobMaster jobMaster = new JobMaster(
-                               rpc,
-                               jobMasterConfiguration,
-                               jmResourceId,
-                               jobGraph,
-                               haServices,
-                               jobManagerSharedServices,
-                               heartbeatServices,
-                               blobServer,
-                               null,
-                               new NoOpOnCompletionActions(),
-                               testingFatalErrorHandler,
-                               FlinkUserCodeClassLoaders.parentFirst(new 
URL[0], JobMasterTest.class.getClassLoader()),
-                               null,
-                               null);
-
-                       CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+               final JobMaster jobMaster = new JobMaster(
+                       rpcService,
+                       jobMasterConfiguration,
+                       jmResourceId,
+                       jobGraph,
+                       haServices,
+                       jobManagerSharedServices,
+                       fastHeartbeatServices,
+                       blobServer,
+                       null,
+                       new NoOpOnCompletionActions(),
+                       testingFatalErrorHandler,
+                       JobMasterTest.class.getClassLoader(),
+                       null,
+                       null);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 
+               try {
                        // wait for the start to complete
                        startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
 
                        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
 
                        // register task manager will trigger monitor heartbeat 
target, schedule heartbeat request at interval time
-                       CompletableFuture<RegistrationResponse> 
registrationResponse = jobMasterGateway
-                               .registerTaskManager(taskManagerAddress, 
taskManagerLocation, testingTimeout);
+                       CompletableFuture<RegistrationResponse> 
registrationResponse = jobMasterGateway.registerTaskManager(
+                               taskExecutorGateway.getAddress(),
+                               taskManagerLocation,
+                               testingTimeout);
 
                        // wait for the completion of the registration
                        registrationResponse.get();
@@ -145,39 +202,17 @@ public class JobMasterTest extends TestLogger {
                        final JobID disconnectedJobManager = 
disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
 
                        assertThat(disconnectedJobManager, 
Matchers.equalTo(jobGraph.getJobID()));
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
-
                } finally {
                        jobManagerSharedServices.shutdown();
-                       rpc.stopService();
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
                }
        }
 
        @Test
        public void testHeartbeatTimeoutWithResourceManager() throws Exception {
                final String resourceManagerAddress = "rm";
-               final String jobManagerAddress = "jm";
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final JobMasterId jobMasterId = JobMasterId.generate();
                final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
-               final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
-               final JobGraph jobGraph = new JobGraph();
-
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(
-                       null,
-                       null);
-               
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
-               haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
-
-               final TestingRpcService rpc = new TestingRpcService();
-
-               final long heartbeatInterval = 1L;
-               final long heartbeatTimeout = 5L;
-               final ScheduledExecutor scheduledExecutor = 
rpc.getScheduledExecutor();
-               final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
 
                final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway(
                        resourceManagerId,
@@ -197,36 +232,30 @@ public class JobMasterTest extends TestLogger {
 
                resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> 
disconnectedJobManagerFuture.complete(tuple.f0));
 
-               rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
-
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-
-               Configuration configuration = new Configuration();
+               rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
 
                final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
                final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
 
-               try (BlobServer blobServer = new BlobServer(configuration, new 
VoidBlobStore())) {
-                       blobServer.start();
-
-                       final JobMaster jobMaster = new JobMaster(
-                               rpc,
-                               jobMasterConfiguration,
-                               jmResourceId,
-                               jobGraph,
-                               haServices,
-                               jobManagerSharedServices,
-                               heartbeatServices,
-                               blobServer,
-                               null,
-                               new NoOpOnCompletionActions(),
-                               testingFatalErrorHandler,
-                               FlinkUserCodeClassLoaders.parentFirst(new 
URL[0], JobMasterTest.class.getClassLoader()),
-                               null,
-                               null);
-
-                       CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+               final JobMaster jobMaster = new JobMaster(
+                       rpcService,
+                       jobMasterConfiguration,
+                       jmResourceId,
+                       jobGraph,
+                       haServices,
+                       jobManagerSharedServices,
+                       fastHeartbeatServices,
+                       blobServer,
+                       null,
+                       new NoOpOnCompletionActions(),
+                       testingFatalErrorHandler,
+                       JobMasterTest.class.getClassLoader(),
+                       null,
+                       null);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 
+               try {
                        // wait for the start operation to complete
                        startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
 
@@ -246,13 +275,9 @@ public class JobMasterTest extends TestLogger {
 
                        // heartbeat timeout should trigger disconnect 
JobManager from ResourceManager
                        assertThat(disconnectedJobManager, 
Matchers.equalTo(jobGraph.getJobID()));
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
-
                } finally {
                        jobManagerSharedServices.shutdown();
-                       rpc.stopService();
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
                }
        }
 

Reply via email to