This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b0ba980caed9f26095bc8134bbe581635bb98dde Author: Till Rohrmann <[email protected]> AuthorDate: Fri Sep 21 17:24:49 2018 +0200 [hotfix] Remove mocking from SlotProtocolTest --- .../slotmanager/SlotProtocolTest.java | 46 +++++++++++----------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 51e6b0b..66966cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -33,13 +33,13 @@ import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnect import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; -import org.mockito.Mockito; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -50,12 +50,10 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; +/** + * Tests for the slot allocation protocol. + */ public class SlotProtocolTest extends TestLogger { private static final long timeout = 10000L; @@ -92,7 +90,7 @@ public class SlotProtocolTest extends TestLogger { final CompletableFuture<ResourceProfile> resourceProfileFuture = new CompletableFuture<>(); ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() - .setAllocateResourceConsumer(resourceProfile -> resourceProfileFuture.complete(resourceProfile)) + .setAllocateResourceConsumer(resourceProfileFuture::complete) .build(); slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); @@ -108,11 +106,13 @@ public class SlotProtocolTest extends TestLogger { assertThat(resourceProfileFuture.get(), is(equalTo(slotRequest.getResourceProfile()))); // slot becomes available - TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito.when( - taskExecutorGateway - .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class)); + final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> requestFuture = new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2)); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); final SlotID slotID = new SlotID(resourceID, 0); @@ -125,8 +125,7 @@ public class SlotProtocolTest extends TestLogger { slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport); // 4) Slot becomes available and TaskExecutor gets a SlotRequest - verify(taskExecutorGateway, timeout(5000L)) - .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID)))); } } @@ -143,11 +142,13 @@ public class SlotProtocolTest extends TestLogger { final ResourceManagerId rmLeaderID = ResourceManagerId.generate(); - TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito.when( - taskExecutorGateway - .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class)); + final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> requestFuture = new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2)); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); try (SlotManager slotManager = new SlotManager( scheduledExecutor, @@ -155,7 +156,7 @@ public class SlotProtocolTest extends TestLogger { TestingUtils.infiniteTime(), TestingUtils.infiniteTime())) { - ResourceActions resourceManagerActions = mock(ResourceActions.class); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); @@ -178,8 +179,7 @@ public class SlotProtocolTest extends TestLogger { slotManager.registerSlotRequest(slotRequest); // a SlotRequest is routed to the TaskExecutor - verify(taskExecutorGateway, timeout(5000)) - .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID)))); } } }
