Repository: flink Updated Branches: refs/heads/master 9f790d3ef -> d95d20eb4
http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index a4f0e03..b5a3c80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -57,6 +57,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -74,6 +75,7 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.Ignore; @@ -89,6 +91,7 @@ import org.slf4j.Logger; import java.net.InetAddress; import java.net.URL; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -106,6 +109,8 @@ import static org.mockito.Mockito.*; public class TaskExecutorTest extends TestLogger { + private final Time timeout = Time.milliseconds(10000L); + @Rule public TestName name = new TestName(); @@ -426,7 +431,7 @@ public class TaskExecutorTest extends TestLogger { verify(heartbeatManager, timeout(timeout)).monitorTarget(any(ResourceID.class), any(HeartbeatTarget.class)); - TaskExecutorGateway taskExecutorGateway = taskManager.getSelf(); + TaskExecutorGateway taskExecutorGateway = taskManager.getSelfGateway(TaskExecutorGateway.class); // trigger the heartbeat asynchronously taskExecutorGateway.heartbeatFromResourceManager(rmResourceId); @@ -723,7 +728,7 @@ public class TaskExecutorTest extends TestLogger { taskManager.start(); - taskManager.submitTask(tdd, jobManagerLeaderId); + taskManager.submitTask(tdd, jobManagerLeaderId, timeout); CompletableFuture<Boolean> completionFuture = TestInvokable.completableFuture; @@ -844,7 +849,7 @@ public class TaskExecutorTest extends TestLogger { resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId); // request slots from the task manager under the given allocation id - taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId); + taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId, timeout); // now inform the task manager about the new job leader jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId); @@ -925,7 +930,7 @@ public class TaskExecutorTest extends TestLogger { when(jobMasterGateway.offerSlots( any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture((Iterable<SlotOffer>)Collections.singleton(offer1))); + .thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1))); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); rpc.registerGateway(jobManagerAddress, jobMasterGateway); @@ -1045,18 +1050,26 @@ public class TaskExecutorTest extends TestLogger { // test that allocating a slot works final SlotID slotID = new SlotID(resourceID, 0); - taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId); + taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId, timeout); // TODO: Figure out the concrete allocation behaviour between RM and TM. Maybe we don't need the SlotID... // test that we can't allocate slots which are blacklisted due to pending confirmation of the RM final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1); + CompletableFuture<Acknowledge> requestSlotFuture = taskManager.requestSlot( + unconfirmedFreeSlotID, + jobId, + new AllocationID(), + jobManagerAddress, + leaderId, + timeout); + try { - taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId); + requestSlotFuture.get(); fail("The slot request should have failed."); - } catch (SlotAllocationException e) { - // expected + } catch (Exception e) { + assertTrue(ExceptionUtils.containsThrowable(e, SlotAllocationException.class)); } // re-register @@ -1066,7 +1079,13 @@ public class TaskExecutorTest extends TestLogger { // now we should be successful because the slots status has been synced // test that we can't allocate slots which are blacklisted due to pending confirmation of the RM - taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId); + taskManager.requestSlot( + unconfirmedFreeSlotID, + jobId, + new AllocationID(), + jobManagerAddress, + leaderId, + timeout); // check if a concurrent error occurred testingFatalErrorHandler.rethrowError(); @@ -1221,7 +1240,7 @@ public class TaskExecutorTest extends TestLogger { Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList()); - CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new CompletableFuture<>(); + CompletableFuture<Collection<SlotOffer>> offerResultFuture = new CompletableFuture<>(); // submit task first and then return acceptance response when( @@ -1239,7 +1258,7 @@ public class TaskExecutorTest extends TestLogger { verify(jobMasterGateway).offerSlots(any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)); // submit the task without having acknowledge the offered slots - taskManager.submitTask(tdd, jobManagerLeaderId); + taskManager.submitTask(tdd, jobManagerLeaderId, timeout); // acknowledge the offered slots offerResultFuture.complete(Collections.singleton(offer1));
