[tests] Harden TaskExecutorTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/235a1696
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/235a1696
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/235a1696

Branch: refs/heads/master
Commit: 235a169691bd3c3ff2a25b7a7763c6900a0f2c6c
Parents: 368d0da
Author: Stephan Ewen <[email protected]>
Authored: Fri Dec 23 19:21:43 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:28 2016 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutorTest.java  | 77 +++++++++++++++-----
 1 file changed, 57 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/235a1696/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2af97b5..aacd329 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -75,14 +75,11 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
-
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
-
 import org.junit.rules.TestName;
 import org.mockito.Matchers;
-import org.powermock.api.mockito.PowerMockito;
 
 import java.net.InetAddress;
 import java.net.URL;
@@ -90,9 +87,18 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.UUID;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
 import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TaskExecutorTest extends TestLogger {
 
@@ -107,10 +113,16 @@ public class TaskExecutorTest extends TestLogger {
 
                final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
                try {
+                       final FatalErrorHandler errorHandler = 
mock(FatalErrorHandler.class);
+
                        // register a mock resource manager gateway
                        ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
+                       when(rmGateway.registerTaskExecutor(
+                                       any(UUID.class), anyString(), 
any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+                               
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
RegistrationResponse.Success()));
+
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
-                       
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+                       
when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
 
                        rpc.registerGateway(resourceManagerAddress, rmGateway);
 
@@ -123,6 +135,8 @@ public class TaskExecutorTest extends TestLogger {
                        final SlotReport slotReport = new SlotReport();
                        
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
+                       final TestingFatalErrorHandler testingFatalErrorHandler 
= new TestingFatalErrorHandler();
+
                        TaskExecutor taskManager = new TaskExecutor(
                                taskManagerServicesConfiguration,
                                taskManagerLocation,
@@ -138,13 +152,16 @@ public class TaskExecutorTest extends TestLogger {
                                taskSlotTable,
                                mock(JobManagerTable.class),
                                mock(JobLeaderService.class),
-                               mock(FatalErrorHandler.class));
+                               testingFatalErrorHandler);
 
                        taskManager.start();
                        String taskManagerAddress = taskManager.getAddress();
 
                        verify(rmGateway).registerTaskExecutor(
                                        any(UUID.class), 
eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
+
+                       // check if a concurrent error occurred
+                       testingFatalErrorHandler.rethrowError();
                }
                finally {
                        rpc.stopService();
@@ -165,6 +182,14 @@ public class TaskExecutorTest extends TestLogger {
                        // register the mock resource manager gateways
                        ResourceManagerGateway rmGateway1 = 
mock(ResourceManagerGateway.class);
                        ResourceManagerGateway rmGateway2 = 
mock(ResourceManagerGateway.class);
+
+                       when(rmGateway1.registerTaskExecutor(
+                                       any(UUID.class), anyString(), 
any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+                                       
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
RegistrationResponse.Success()));
+                       when(rmGateway2.registerTaskExecutor(
+                                       any(UUID.class), anyString(), 
any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+                                       
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
RegistrationResponse.Success()));
+
                        rpc.registerGateway(address1, rmGateway1);
                        rpc.registerGateway(address2, rmGateway2);
 
@@ -174,9 +199,9 @@ public class TaskExecutorTest extends TestLogger {
                        
haServices.setResourceManagerLeaderRetriever(testLeaderService);
 
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
-                       
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-                       
PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new
 Configuration());
-                       
PowerMockito.when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new
 String[1]);
+                       
when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+                       
when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new 
Configuration());
+                       
when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new 
String[1]);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
                        
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
@@ -186,6 +211,8 @@ public class TaskExecutorTest extends TestLogger {
                        final SlotReport slotReport = new SlotReport();
                        
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
+                       final TestingFatalErrorHandler testingFatalErrorHandler 
= new TestingFatalErrorHandler();
+
                        TaskExecutor taskManager = new TaskExecutor(
                                taskManagerServicesConfiguration,
                                taskManagerLocation,
@@ -201,7 +228,7 @@ public class TaskExecutorTest extends TestLogger {
                                taskSlotTable,
                                mock(JobManagerTable.class),
                                mock(JobLeaderService.class),
-                               mock(FatalErrorHandler.class));
+                               testingFatalErrorHandler);
 
                        taskManager.start();
                        String taskManagerAddress = taskManager.getAddress();
@@ -225,6 +252,9 @@ public class TaskExecutorTest extends TestLogger {
                        verify(rmGateway2).registerTaskExecutor(
                                        eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), eq(slotReport), any(Time.class));
                        
assertNotNull(taskManager.getResourceManagerConnection());
+
+                       // check if a concurrent error occurred
+                       testingFatalErrorHandler.rethrowError();
                }
                finally {
                        rpc.stopService();
@@ -310,6 +340,7 @@ public class TaskExecutorTest extends TestLogger {
                
when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class));
 
                try {
+                       final TestingFatalErrorHandler testingFatalErrorHandler 
= new TestingFatalErrorHandler();
 
                        TaskExecutor taskManager = new TaskExecutor(
                                taskManagerConfiguration,
@@ -326,7 +357,7 @@ public class TaskExecutorTest extends TestLogger {
                                taskSlotTable,
                                jobManagerTable,
                                mock(JobLeaderService.class),
-                               mock(FatalErrorHandler.class));
+                               testingFatalErrorHandler);
 
                        taskManager.start();
 
@@ -336,6 +367,8 @@ public class TaskExecutorTest extends TestLogger {
 
                        completionFuture.get();
 
+                       // check if a concurrent error occurred
+                       testingFatalErrorHandler.rethrowError();
                } finally {
                        rpc.stopService();
                }
@@ -452,10 +485,10 @@ public class TaskExecutorTest extends TestLogger {
                                        
(Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
                                        eq(jobManagerLeaderId),
                                        any(Time.class));
-               } finally {
+
                        // check if a concurrent error occurred
                        testingFatalErrorHandler.rethrowError();
-
+               } finally {
                        rpc.stopService();
                }
        }
@@ -559,10 +592,10 @@ public class TaskExecutorTest extends TestLogger {
                        assertTrue(taskSlotTable.existsActiveSlot(jobId, 
allocationId1));
                        assertFalse(taskSlotTable.existsActiveSlot(jobId, 
allocationId2));
                        assertTrue(taskSlotTable.isSlotFree(1));
-               } finally {
+
                        // check if a concurrent error occurred
                        testingFatalErrorHandler.rethrowError();
-
+               } finally {
                        rpc.stopService();
                }
        }
@@ -595,11 +628,13 @@ public class TaskExecutorTest extends TestLogger {
                        
haServices.setResourceManagerLeaderRetriever(testLeaderService);
 
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
-                       
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+                       
when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
                        
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
 
+                       final TestingFatalErrorHandler testingFatalErrorHandler 
= new TestingFatalErrorHandler();
+
                        TaskExecutor taskManager = new TaskExecutor(
                                taskManagerServicesConfiguration,
                                taskManagerLocation,
@@ -615,7 +650,7 @@ public class TaskExecutorTest extends TestLogger {
                                mock(TaskSlotTable.class),
                                mock(JobManagerTable.class),
                                mock(JobLeaderService.class),
-                               mock(FatalErrorHandler.class));
+                               testingFatalErrorHandler);
 
                        taskManager.start();
                        String taskManagerAddress = taskManager.getAddress();
@@ -653,6 +688,8 @@ public class TaskExecutorTest extends TestLogger {
                                taskManager.requestSlot(unconfirmedFreeSlotID, 
jobId, new AllocationID(), jobManagerAddress, leaderId);
                        assertTrue(tmSlotRequestReply3 instanceof 
TMSlotRequestRegistered);
 
+                       // check if a concurrent error occurred
+                       testingFatalErrorHandler.rethrowError();
                }
                finally {
                        rpc.stopService();

Reply via email to