This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0ba980caed9f26095bc8134bbe581635bb98dde
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Sep 21 17:24:49 2018 +0200

    [hotfix] Remove mocking from SlotProtocolTest
---
 .../slotmanager/SlotProtocolTest.java              | 46 +++++++++++-----------
 1 file changed, 23 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 51e6b0b..66966cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -33,13 +33,13 @@ import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnect
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
@@ -50,12 +50,10 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for the slot allocation protocol.
+ */
 public class SlotProtocolTest extends TestLogger {
 
        private static final long timeout = 10000L;
@@ -92,7 +90,7 @@ public class SlotProtocolTest extends TestLogger {
 
                        final CompletableFuture<ResourceProfile> 
resourceProfileFuture = new CompletableFuture<>();
                        ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
-                               .setAllocateResourceConsumer(resourceProfile -> 
resourceProfileFuture.complete(resourceProfile))
+                               
.setAllocateResourceConsumer(resourceProfileFuture::complete)
                                .build();
 
                        slotManager.start(rmLeaderID, 
Executors.directExecutor(), resourceManagerActions);
@@ -108,11 +106,13 @@ public class SlotProtocolTest extends TestLogger {
                        assertThat(resourceProfileFuture.get(), 
is(equalTo(slotRequest.getResourceProfile())));
 
                        // slot becomes available
-                       TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-                       Mockito.when(
-                               taskExecutorGateway
-                                       .requestSlot(any(SlotID.class), 
any(JobID.class), any(AllocationID.class), any(String.class), 
any(ResourceManagerId.class), any(Time.class)))
-                               .thenReturn(mock(CompletableFuture.class));
+                       final CompletableFuture<Tuple3<SlotID, JobID, 
AllocationID>> requestFuture = new CompletableFuture<>();
+                       TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                               .setRequestSlotFunction(tuple5 -> {
+                                       
requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2));
+                                       return new CompletableFuture<>();
+                               })
+                               .createTestingTaskExecutorGateway();
 
                        final ResourceID resourceID = ResourceID.generate();
                        final SlotID slotID = new SlotID(resourceID, 0);
@@ -125,8 +125,7 @@ public class SlotProtocolTest extends TestLogger {
                        slotManager.registerTaskManager(new 
TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
 
                        // 4) Slot becomes available and TaskExecutor gets a 
SlotRequest
-                       verify(taskExecutorGateway, timeout(5000L))
-                               .requestSlot(eq(slotID), eq(jobID), 
eq(allocationID), any(String.class), any(ResourceManagerId.class), 
any(Time.class));
+                       assertThat(requestFuture.get(), 
is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
                }
        }
 
@@ -143,11 +142,13 @@ public class SlotProtocolTest extends TestLogger {
 
                final ResourceManagerId rmLeaderID = 
ResourceManagerId.generate();
 
-               TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               Mockito.when(
-                       taskExecutorGateway
-                               .requestSlot(any(SlotID.class), 
any(JobID.class), any(AllocationID.class), any(String.class), 
any(ResourceManagerId.class), any(Time.class)))
-                       .thenReturn(mock(CompletableFuture.class));
+               final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> 
requestFuture = new CompletableFuture<>();
+               TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       .setRequestSlotFunction(tuple5 -> {
+                               requestFuture.complete(Tuple3.of(tuple5.f0, 
tuple5.f1, tuple5.f2));
+                               return new CompletableFuture<>();
+                       })
+                       .createTestingTaskExecutorGateway();
 
                try (SlotManager slotManager = new SlotManager(
                        scheduledExecutor,
@@ -155,7 +156,7 @@ public class SlotProtocolTest extends TestLogger {
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime())) {
 
-                       ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+                       ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder().build();
 
                        slotManager.start(rmLeaderID, 
Executors.directExecutor(), resourceManagerActions);
 
@@ -178,8 +179,7 @@ public class SlotProtocolTest extends TestLogger {
                        slotManager.registerSlotRequest(slotRequest);
 
                        // a SlotRequest is routed to the TaskExecutor
-                       verify(taskExecutorGateway, timeout(5000))
-                               .requestSlot(eq(slotID), eq(jobID), 
eq(allocationID), any(String.class), any(ResourceManagerId.class), 
any(Time.class));
+                       assertThat(requestFuture.get(), 
is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
                }
        }
 }

Reply via email to