[tests] Harden TaskExecutorTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/235a1696 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/235a1696 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/235a1696 Branch: refs/heads/master Commit: 235a169691bd3c3ff2a25b7a7763c6900a0f2c6c Parents: 368d0da Author: Stephan Ewen <[email protected]> Authored: Fri Dec 23 19:21:43 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:28 2016 +0100 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskExecutorTest.java | 77 +++++++++++++++----- 1 file changed, 57 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/235a1696/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 2af97b5..aacd329 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -21,10 +21,10 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; @@ -75,14 +75,11 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; - import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; - import org.junit.rules.TestName; import org.mockito.Matchers; -import org.powermock.api.mockito.PowerMockito; import java.net.InetAddress; import java.net.URL; @@ -90,9 +87,18 @@ import java.util.Arrays; import java.util.Collections; import java.util.UUID; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TaskExecutorTest extends TestLogger { @@ -107,10 +113,16 @@ public class TaskExecutorTest extends TestLogger { final TestingSerialRpcService rpc = new TestingSerialRpcService(); try { + final FatalErrorHandler errorHandler = mock(FatalErrorHandler.class); + // register a mock resource manager gateway ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); + when(rmGateway.registerTaskExecutor( + any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) + .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success())); + TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); - PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); + when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); rpc.registerGateway(resourceManagerAddress, rmGateway); @@ -123,6 +135,8 @@ public class TaskExecutorTest extends TestLogger { final SlotReport slotReport = new SlotReport(); when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + TaskExecutor taskManager = new TaskExecutor( taskManagerServicesConfiguration, taskManagerLocation, @@ -138,13 +152,16 @@ public class TaskExecutorTest extends TestLogger { taskSlotTable, mock(JobManagerTable.class), mock(JobLeaderService.class), - mock(FatalErrorHandler.class)); + testingFatalErrorHandler); taskManager.start(); String taskManagerAddress = taskManager.getAddress(); verify(rmGateway).registerTaskExecutor( any(UUID.class), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class)); + + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowError(); } finally { rpc.stopService(); @@ -165,6 +182,14 @@ public class TaskExecutorTest extends TestLogger { // register the mock resource manager gateways ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class); ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class); + + when(rmGateway1.registerTaskExecutor( + any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) + .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success())); + when(rmGateway2.registerTaskExecutor( + any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) + .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success())); + rpc.registerGateway(address1, rmGateway1); rpc.registerGateway(address2, rmGateway2); @@ -174,9 +199,9 @@ public class TaskExecutorTest extends TestLogger { haServices.setResourceManagerLeaderRetriever(testLeaderService); TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); - PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); - PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration()); - PowerMockito.when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]); + when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); + when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration()); + when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); when(taskManagerLocation.getResourceID()).thenReturn(resourceID); @@ -186,6 +211,8 @@ public class TaskExecutorTest extends TestLogger { final SlotReport slotReport = new SlotReport(); when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + TaskExecutor taskManager = new TaskExecutor( taskManagerServicesConfiguration, taskManagerLocation, @@ -201,7 +228,7 @@ public class TaskExecutorTest extends TestLogger { taskSlotTable, mock(JobManagerTable.class), mock(JobLeaderService.class), - mock(FatalErrorHandler.class)); + testingFatalErrorHandler); taskManager.start(); String taskManagerAddress = taskManager.getAddress(); @@ -225,6 +252,9 @@ public class TaskExecutorTest extends TestLogger { verify(rmGateway2).registerTaskExecutor( eq(leaderId2), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); + + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowError(); } finally { rpc.stopService(); @@ -310,6 +340,7 @@ public class TaskExecutorTest extends TestLogger { when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class)); try { + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); TaskExecutor taskManager = new TaskExecutor( taskManagerConfiguration, @@ -326,7 +357,7 @@ public class TaskExecutorTest extends TestLogger { taskSlotTable, jobManagerTable, mock(JobLeaderService.class), - mock(FatalErrorHandler.class)); + testingFatalErrorHandler); taskManager.start(); @@ -336,6 +367,8 @@ public class TaskExecutorTest extends TestLogger { completionFuture.get(); + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowError(); } finally { rpc.stopService(); } @@ -452,10 +485,10 @@ public class TaskExecutorTest extends TestLogger { (Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)), eq(jobManagerLeaderId), any(Time.class)); - } finally { + // check if a concurrent error occurred testingFatalErrorHandler.rethrowError(); - + } finally { rpc.stopService(); } } @@ -559,10 +592,10 @@ public class TaskExecutorTest extends TestLogger { assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1)); assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2)); assertTrue(taskSlotTable.isSlotFree(1)); - } finally { + // check if a concurrent error occurred testingFatalErrorHandler.rethrowError(); - + } finally { rpc.stopService(); } } @@ -595,11 +628,13 @@ public class TaskExecutorTest extends TestLogger { haServices.setResourceManagerLeaderRetriever(testLeaderService); TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); - PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); + when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); when(taskManagerLocation.getResourceID()).thenReturn(resourceID); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + TaskExecutor taskManager = new TaskExecutor( taskManagerServicesConfiguration, taskManagerLocation, @@ -615,7 +650,7 @@ public class TaskExecutorTest extends TestLogger { mock(TaskSlotTable.class), mock(JobManagerTable.class), mock(JobLeaderService.class), - mock(FatalErrorHandler.class)); + testingFatalErrorHandler); taskManager.start(); String taskManagerAddress = taskManager.getAddress(); @@ -653,6 +688,8 @@ public class TaskExecutorTest extends TestLogger { taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId); assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered); + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowError(); } finally { rpc.stopService();
