Repository: flink
Updated Branches:
  refs/heads/master 9f790d3ef -> d95d20eb4


http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index a4f0e03..b5a3c80 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -57,6 +57,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -74,6 +75,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Ignore;
@@ -89,6 +91,7 @@ import org.slf4j.Logger;
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -106,6 +109,8 @@ import static org.mockito.Mockito.*;
 
 public class TaskExecutorTest extends TestLogger {
 
+       private final Time timeout = Time.milliseconds(10000L);
+
        @Rule
        public TestName name = new TestName();
 
@@ -426,7 +431,7 @@ public class TaskExecutorTest extends TestLogger {
 
                        verify(heartbeatManager, 
timeout(timeout)).monitorTarget(any(ResourceID.class), 
any(HeartbeatTarget.class));
 
-                       TaskExecutorGateway taskExecutorGateway = 
taskManager.getSelf();
+                       TaskExecutorGateway taskExecutorGateway = 
taskManager.getSelfGateway(TaskExecutorGateway.class);
 
                        // trigger the heartbeat asynchronously
                        
taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);
@@ -723,7 +728,7 @@ public class TaskExecutorTest extends TestLogger {
 
                        taskManager.start();
 
-                       taskManager.submitTask(tdd, jobManagerLeaderId);
+                       taskManager.submitTask(tdd, jobManagerLeaderId, 
timeout);
 
                        CompletableFuture<Boolean> completionFuture = 
TestInvokable.completableFuture;
 
@@ -844,7 +849,7 @@ public class TaskExecutorTest extends TestLogger {
                        
resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, 
resourceManagerLeaderId);
 
                        // request slots from the task manager under the given 
allocation id
-                       taskManager.requestSlot(slotId, jobId, allocationId, 
jobManagerAddress, resourceManagerLeaderId);
+                       taskManager.requestSlot(slotId, jobId, allocationId, 
jobManagerAddress, resourceManagerLeaderId, timeout);
 
                        // now inform the task manager about the new job leader
                        
jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, 
jobManagerLeaderId);
@@ -925,7 +930,7 @@ public class TaskExecutorTest extends TestLogger {
 
                when(jobMasterGateway.offerSlots(
                                any(ResourceID.class), any(Iterable.class), 
eq(jobManagerLeaderId), any(Time.class)))
-                       
.thenReturn(CompletableFuture.completedFuture((Iterable<SlotOffer>)Collections.singleton(offer1)));
+                       
.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1)));
 
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
                rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -1045,18 +1050,26 @@ public class TaskExecutorTest extends TestLogger {
 
                        // test that allocating a slot works
                        final SlotID slotID = new SlotID(resourceID, 0);
-                       taskManager.requestSlot(slotID, jobId, new 
AllocationID(), jobManagerAddress, leaderId);
+                       taskManager.requestSlot(slotID, jobId, new 
AllocationID(), jobManagerAddress, leaderId, timeout);
 
                        // TODO: Figure out the concrete allocation behaviour 
between RM and TM. Maybe we don't need the SlotID...
                        // test that we can't allocate slots which are 
blacklisted due to pending confirmation of the RM
                        final SlotID unconfirmedFreeSlotID = new 
SlotID(resourceID, 1);
 
+                       CompletableFuture<Acknowledge> requestSlotFuture = 
taskManager.requestSlot(
+                               unconfirmedFreeSlotID,
+                               jobId,
+                               new AllocationID(),
+                               jobManagerAddress,
+                               leaderId,
+                               timeout);
+
                        try {
-                               taskManager.requestSlot(unconfirmedFreeSlotID, 
jobId, new AllocationID(), jobManagerAddress, leaderId);
+                               requestSlotFuture.get();
 
                                fail("The slot request should have failed.");
-                       } catch (SlotAllocationException e) {
-                               // expected
+                       } catch (Exception e) {
+                               assertTrue(ExceptionUtils.containsThrowable(e, 
SlotAllocationException.class));
                        }
 
                        // re-register
@@ -1066,7 +1079,13 @@ public class TaskExecutorTest extends TestLogger {
 
                        // now we should be successful because the slots status 
has been synced
                        // test that we can't allocate slots which are 
blacklisted due to pending confirmation of the RM
-                       taskManager.requestSlot(unconfirmedFreeSlotID, jobId, 
new AllocationID(), jobManagerAddress, leaderId);
+                       taskManager.requestSlot(
+                               unconfirmedFreeSlotID,
+                               jobId,
+                               new AllocationID(),
+                               jobManagerAddress,
+                               leaderId,
+                               timeout);
 
                        // check if a concurrent error occurred
                        testingFatalErrorHandler.rethrowError();
@@ -1221,7 +1240,7 @@ public class TaskExecutorTest extends TestLogger {
                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                
Collections.<InputGateDeploymentDescriptor>emptyList());
 
-                       CompletableFuture<Iterable<SlotOffer>> 
offerResultFuture = new CompletableFuture<>();
+                       CompletableFuture<Collection<SlotOffer>> 
offerResultFuture = new CompletableFuture<>();
 
                        // submit task first and then return acceptance response
                        when(
@@ -1239,7 +1258,7 @@ public class TaskExecutorTest extends TestLogger {
                        
verify(jobMasterGateway).offerSlots(any(ResourceID.class), any(Iterable.class), 
eq(jobManagerLeaderId), any(Time.class));
 
                        // submit the task without having acknowledge the 
offered slots
-                       taskManager.submitTask(tdd, jobManagerLeaderId);
+                       taskManager.submitTask(tdd, jobManagerLeaderId, 
timeout);
 
                        // acknowledge the offered slots
                        
offerResultFuture.complete(Collections.singleton(offer1));

Reply via email to