This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f40783f48a6ccef9c609ac8204437e00033b76c
Author: 陈梓立 <wander4...@gmail.com>
AuthorDate: Mon Aug 6 16:09:43 2018 +0800

    [FLINK-10099][test] Improve YarnResourceManagerTest
    
    Introduce methods to mock a Yarn Container and ContainerStatus.
    
    Properly shutdown a started ResourceManager.
    
    This closes #6499.
---
 .../apache/flink/yarn/YarnResourceManagerTest.java | 329 ++++++++++++---------
 1 file changed, 181 insertions(+), 148 deletions(-)

diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index eb8e968..a7d4f43 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -54,6 +54,7 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
 
@@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -115,21 +117,28 @@ public class YarnResourceManagerTest extends TestLogger {
 
        private static final Time TIMEOUT = Time.seconds(10L);
 
-       private Configuration flinkConfig = new Configuration();
+       private Configuration flinkConfig;
 
-       private Map<String, String> env = new HashMap<>();
+       private Map<String, String> env;
+
+       private TestingFatalErrorHandler testingFatalErrorHandler;
 
        @Rule
        public TemporaryFolder folder = new TemporaryFolder();
 
        @Before
        public void setup() {
+               testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+               flinkConfig = new Configuration();
                
flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 
100);
+
                File root = folder.getRoot();
                File home = new File(root, "home");
                boolean created = home.mkdir();
                assertTrue(created);
 
+               env = new HashMap<>();
                env.put(ENV_APP_ID, "foo");
                env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
                env.put(ENV_CLIENT_SHIP_FILES, "");
@@ -139,15 +148,21 @@ public class YarnResourceManagerTest extends TestLogger {
        }
 
        @After
-       public void teardown() {
-               env.clear();
+       public void teardown() throws Exception {
+               if (testingFatalErrorHandler != null) {
+                       testingFatalErrorHandler.rethrowError();
+               }
+
+               if (env != null) {
+                       env.clear();
+               }
        }
 
        static class TestingYarnResourceManager extends YarnResourceManager {
-               public AMRMClientAsync<AMRMClient.ContainerRequest> 
mockResourceManagerClient;
-               public NMClient mockNMClient;
+               AMRMClientAsync<AMRMClient.ContainerRequest> 
mockResourceManagerClient;
+               NMClient mockNMClient;
 
-               public TestingYarnResourceManager(
+               TestingYarnResourceManager(
                                RpcService rpcService,
                                String resourceManagerEndpointId,
                                ResourceID resourceId,
@@ -181,11 +196,11 @@ public class YarnResourceManagerTest extends TestLogger {
                        this.mockResourceManagerClient = 
mockResourceManagerClient;
                }
 
-               public <T> CompletableFuture<T> runInMainThread(Callable<T> 
callable) {
+               <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
                        return callAsync(callable, TIMEOUT);
                }
 
-               public MainThreadExecutor getMainThreadExecutorForTesting() {
+               MainThreadExecutor getMainThreadExecutorForTesting() {
                        return super.getMainThreadExecutor();
                }
 
@@ -193,7 +208,7 @@ public class YarnResourceManagerTest extends TestLogger {
                protected AMRMClientAsync<AMRMClient.ContainerRequest> 
createAndStartResourceManagerClient(
                                YarnConfiguration yarnConfiguration,
                                int yarnHeartbeatIntervalMillis,
-                               @Nullable String webInteraceUrl) {
+                               @Nullable String webInterfaceUrl) {
                        return mockResourceManagerClient;
                }
 
@@ -213,7 +228,6 @@ public class YarnResourceManagerTest extends TestLogger {
 
                // services
                final TestingRpcService rpcService;
-               final TestingFatalErrorHandler fatalErrorHandler;
                final MockResourceManagerRuntimeServices rmServices;
 
                // RM
@@ -240,7 +254,6 @@ public class YarnResourceManagerTest extends TestLogger {
                 */
                Context() throws Exception {
                        rpcService = new TestingRpcService();
-                       fatalErrorHandler = new TestingFatalErrorHandler();
                        rmServices = new MockResourceManagerRuntimeServices();
 
                        // resource manager
@@ -258,7 +271,7 @@ public class YarnResourceManagerTest extends TestLogger {
                                                        
rmServices.metricRegistry,
                                                        
rmServices.jobLeaderIdService,
                                                        new 
ClusterInformation("localhost", 1234),
-                                                       fatalErrorHandler,
+                                                       
testingFatalErrorHandler,
                                                        null,
                                                        
mockResourceManagerClient,
                                                        mockNMClient);
@@ -269,15 +282,15 @@ public class YarnResourceManagerTest extends TestLogger {
                 */
                class MockResourceManagerRuntimeServices {
 
-                       public final ScheduledExecutor scheduledExecutor;
-                       public final TestingHighAvailabilityServices 
highAvailabilityServices;
-                       public final HeartbeatServices heartbeatServices;
-                       public final MetricRegistry metricRegistry;
-                       public final TestingLeaderElectionService 
rmLeaderElectionService;
-                       public final JobLeaderIdService jobLeaderIdService;
-                       public final SlotManager slotManager;
+                       private final ScheduledExecutor scheduledExecutor;
+                       private final TestingHighAvailabilityServices 
highAvailabilityServices;
+                       private final HeartbeatServices heartbeatServices;
+                       private final MetricRegistry metricRegistry;
+                       private final TestingLeaderElectionService 
rmLeaderElectionService;
+                       private final JobLeaderIdService jobLeaderIdService;
+                       private final SlotManager slotManager;
 
-                       public UUID rmLeaderSessionId;
+                       private UUID rmLeaderSessionId;
 
                        MockResourceManagerRuntimeServices() throws Exception {
                                scheduledExecutor = 
mock(ScheduledExecutor.class);
@@ -295,7 +308,7 @@ public class YarnResourceManagerTest extends TestLogger {
                                                Time.minutes(5L));
                        }
 
-                       public void grantLeadership() throws Exception {
+                       void grantLeadership() throws Exception {
                                rmLeaderSessionId = UUID.randomUUID();
                                
rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(),
 TimeUnit.MILLISECONDS);
                        }
@@ -304,7 +317,7 @@ public class YarnResourceManagerTest extends TestLogger {
                /**
                 * Start the resource manager and grant leadership to it.
                 */
-               public void startResourceManager() throws Exception {
+               void startResourceManager() throws Exception {
                        resourceManager.start();
                        rmServices.grantLeadership();
                }
@@ -312,93 +325,129 @@ public class YarnResourceManagerTest extends TestLogger {
                /**
                 * Stop the Akka actor system.
                 */
-               public void stopResourceManager() throws Exception {
+               void stopResourceManager() throws Exception {
                        rpcService.stopService().get();
                }
-       }
 
-       @Test
-       public void testStopWorker() throws Exception {
-               new Context() {{
+               /**
+                * A wrapper function for running test. Deal with setup and 
teardown logic
+                * in Context.
+                * @param testMethod the real test body.
+                */
+               void runTest(RunnableWithException testMethod) throws Exception 
{
                        startResourceManager();
-                       // Request slot from SlotManager.
-                       CompletableFuture<?> registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
-                               rmServices.slotManager.registerSlotRequest(
-                                       new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, taskHost));
-                               return null;
-                       });
+                       try {
+                               testMethod.run();
+                       } finally {
+                               stopResourceManager();
+                       }
+               }
+       }
 
-                       // wait for the registerSlotRequest completion
-                       registerSlotRequestFuture.get();
-
-                       // Callback from YARN when container is allocated.
-                       Container testingContainer = mock(Container.class);
-                       when(testingContainer.getId()).thenReturn(
-                               ContainerId.newInstance(
-                                       ApplicationAttemptId.newInstance(
-                                               
ApplicationId.newInstance(System.currentTimeMillis(), 1),
-                                               1),
-                                       1));
-                       
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
-                       
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
-                       
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
-                       
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
-                       
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-                       
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
-
-                       // Remote task executor registers with 
YarnResourceManager.
-                       TaskExecutorGateway mockTaskExecutorGateway = 
mock(TaskExecutorGateway.class);
-                       rpcService.registerGateway(taskHost, 
mockTaskExecutorGateway);
-
-                       final ResourceManagerGateway rmGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
-
-                       final ResourceID taskManagerResourceId = new 
ResourceID(testingContainer.getId().toString());
-                       final SlotReport slotReport = new SlotReport(
-                               new SlotStatus(
-                                       new SlotID(taskManagerResourceId, 1),
-                                       new ResourceProfile(10, 1, 1, 1, 0, 
Collections.emptyMap())));
-
-                       CompletableFuture<Integer> numberRegisteredSlotsFuture 
= rmGateway
-                               .registerTaskExecutor(
-                                       taskHost,
-                                       taskManagerResourceId,
-                                       dataPort,
-                                       hardwareDescription,
-                                       Time.seconds(10L))
-                               .thenCompose(
-                                       (RegistrationResponse response) -> {
-                                               assertThat(response, 
instanceOf(TaskExecutorRegistrationSuccess.class));
-                                               final 
TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) 
response;
-                                               return rmGateway.sendSlotReport(
-                                                       taskManagerResourceId,
-                                                       
success.getRegistrationId(),
-                                                       slotReport,
-                                                       Time.seconds(10L));
-                                       })
-                               .handleAsync(
-                                       (Acknowledge ignored, Throwable 
throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
-                                       
resourceManager.getMainThreadExecutorForTesting());
-
-                       final int numberRegisteredSlots = 
numberRegisteredSlotsFuture.get();
-
-                       assertEquals(1, numberRegisteredSlots);
-
-                       // Unregister all task executors and release all 
containers.
-                       CompletableFuture<?> unregisterAndReleaseFuture =  
resourceManager.runInMainThread(() -> {
-                               
rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
-                               return null;
-                       });
+       private static Container mockContainer(String host, int port, int 
containerId) {
+               Container mockContainer = mock(Container.class);
+
+               NodeId mockNodeId = NodeId.newInstance(host, port);
+               ContainerId mockContainerId = ContainerId.newInstance(
+                       ApplicationAttemptId.newInstance(
+                               
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+                               1
+                       ),
+                       containerId
+               );
+
+               when(mockContainer.getId()).thenReturn(mockContainerId);
+               when(mockContainer.getNodeId()).thenReturn(mockNodeId);
+               
when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+               
when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+               return mockContainer;
+       }
+
+       private static ContainerStatus mockContainerStatus(ContainerId 
containerId) {
+               ContainerStatus mockContainerStatus = 
mock(ContainerStatus.class);
 
-                       unregisterAndReleaseFuture.get();
+               
when(mockContainerStatus.getContainerId()).thenReturn(containerId);
+               
when(mockContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
+               when(mockContainerStatus.getDiagnostics()).thenReturn("Test 
exit");
+               when(mockContainerStatus.getExitStatus()).thenReturn(-1);
 
-                       
verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
-                       
verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+               return mockContainerStatus;
+       }
 
-                       stopResourceManager();
+       @Test
+       public void testStopWorker() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               // Request slot from SlotManager.
+                               CompletableFuture<?> registerSlotRequestFuture 
= resourceManager.runInMainThread(() -> {
+                                       
rmServices.slotManager.registerSlotRequest(
+                                               new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+                                       return null;
+                               });
+
+                               // wait for the registerSlotRequest completion
+                               registerSlotRequestFuture.get();
+
+                               // Callback from YARN when container is 
allocated.
+                               Container testingContainer = 
mockContainer("container", 1234, 1);
+
+                               
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+                               
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+                               
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
+
+                               // Remote task executor registers with 
YarnResourceManager.
+                               TaskExecutorGateway mockTaskExecutorGateway = 
mock(TaskExecutorGateway.class);
+                               rpcService.registerGateway(taskHost, 
mockTaskExecutorGateway);
+
+                               final ResourceManagerGateway rmGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+                               final ResourceID taskManagerResourceId = new 
ResourceID(testingContainer.getId().toString());
+                               final SlotReport slotReport = new SlotReport(
+                                       new SlotStatus(
+                                               new 
SlotID(taskManagerResourceId, 1),
+                                               new ResourceProfile(10, 1, 1, 
1, 0, Collections.emptyMap())));
+
+                               CompletableFuture<Integer> 
numberRegisteredSlotsFuture = rmGateway
+                                       .registerTaskExecutor(
+                                               taskHost,
+                                               taskManagerResourceId,
+                                               dataPort,
+                                               hardwareDescription,
+                                               Time.seconds(10L))
+                                       .thenCompose(
+                                               (RegistrationResponse response) 
-> {
+                                                       assertThat(response, 
instanceOf(TaskExecutorRegistrationSuccess.class));
+                                                       final 
TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) 
response;
+                                                       return 
rmGateway.sendSlotReport(
+                                                               
taskManagerResourceId,
+                                                               
success.getRegistrationId(),
+                                                               slotReport,
+                                                               
Time.seconds(10L));
+                                               })
+                                       .handleAsync(
+                                               (Acknowledge ignored, Throwable 
throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
+                                               
resourceManager.getMainThreadExecutorForTesting());
+
+                               final int numberRegisteredSlots = 
numberRegisteredSlotsFuture.get();
+
+                               assertEquals(1, numberRegisteredSlots);
+
+                               // Unregister all task executors and release 
all containers.
+                               CompletableFuture<?> unregisterAndReleaseFuture 
= resourceManager.runInMainThread(() -> {
+                                       
rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
+                                       return null;
+                               });
+
+                               unregisterAndReleaseFuture.get();
+
+                               
verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
+                               
verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+                       });
 
                        // It's now safe to access the SlotManager state since 
the ResourceManager has been stopped.
-                       
assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0);
-                       
assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
+                       
assertThat(rmServices.slotManager.getNumberRegisteredSlots(), 
Matchers.equalTo(0));
+                       
assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), 
Matchers.equalTo(0));
                }};
        }
 
@@ -411,65 +460,49 @@ public class YarnResourceManagerTest extends TestLogger {
                        final File applicationDir = folder.newFolder(".flink");
                        env.put(FLINK_YARN_FILES, 
applicationDir.getCanonicalPath());
 
-                       startResourceManager();
-
-                       
resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
-                       assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
+                       runTest(() -> {
+                               
resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+                               assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
+                       });
                }};
        }
 
        /**
         * Tests that YarnResourceManager will not request more containers than 
needs during
         * callback from Yarn when container is Completed.
-        * @throws Exception
         */
        @Test
        public void testOnContainerCompleted() throws Exception {
                new Context() {{
-                       startResourceManager();
-                       CompletableFuture<?> registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
-                               rmServices.slotManager.registerSlotRequest(
-                                       new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, taskHost));
-                               return null;
+                       runTest(() -> {
+                               CompletableFuture<?> registerSlotRequestFuture 
= resourceManager.runInMainThread(() -> {
+                                       
rmServices.slotManager.registerSlotRequest(
+                                               new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+                                       return null;
+                               });
+
+                               // wait for the registerSlotRequest completion
+                               registerSlotRequestFuture.get();
+
+                               // Callback from YARN when container is 
allocated.
+                               Container testingContainer = 
mockContainer("container", 1234, 1);
+
+                               
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+                               
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+                               
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
+
+                               // Callback from YARN when container is 
Completed, pending request can not be fulfilled by pending
+                               // containers, need to request new container.
+                               ContainerStatus testingContainerStatus = 
mockContainerStatus(testingContainer.getId());
+
+                               
resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+                               verify(mockResourceManagerClient, 
times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
+                               // Callback from YARN when container is 
Completed happened before global fail, pending request
+                               // slot is already fulfilled by pending 
containers, no need to request new container.
+                               
resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+                               verify(mockResourceManagerClient, 
times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
                        });
-
-                       // wait for the registerSlotRequest completion
-                       registerSlotRequestFuture.get();
-
-                       ContainerId testContainerId = ContainerId.newInstance(
-                               ApplicationAttemptId.newInstance(
-                                       
ApplicationId.newInstance(System.currentTimeMillis(), 1),
-                                       1),
-                               1);
-
-                       // Callback from YARN when container is allocated.
-                       Container testingContainer = mock(Container.class);
-                       
when(testingContainer.getId()).thenReturn(testContainerId);
-                       
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
-                       
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
-                       
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
-                       
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
-                       
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-                       
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
-
-                       // Callback from YARN when container is Completed, 
pending request can not be fulfilled by pending
-                       // containers, need to request new container.
-                       ContainerStatus testingContainerStatus = 
mock(ContainerStatus.class);
-                       
when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
-                       
when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
-                       
when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
-                       
when(testingContainerStatus.getExitStatus()).thenReturn(-1);
-                       
resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
-                       verify(mockResourceManagerClient, 
times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-
-                       // Callback from YARN when container is Completed 
happened before global fail, pending request
-                       // slot is already fulfilled by pending containers, no 
need to request new container.
-                       
when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
-                       
when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
-                       
when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
-                       
when(testingContainerStatus.getExitStatus()).thenReturn(-1);
-                       
resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
-                       verify(mockResourceManagerClient, 
times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
                }};
        }
 }

Reply via email to