Repository: flink
Updated Branches:
  refs/heads/master 3debf47e5 -> 2d19d1100


[FLINK-8783] [tests] Harden SlotPoolRpcTest

Wait for releasing of timed out pending slot requests before checking the
number of pending slots requests.

This closes #5684.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d19d110
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d19d110
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d19d110

Branch: refs/heads/master
Commit: 2d19d11007d37298dffb78f3aa43d749f9e597ce
Parents: 3debf47
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Mar 12 18:04:38 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Mar 13 08:14:53 2018 +0100

----------------------------------------------------------------------
 .../runtime/jobmaster/slotpool/SlotPoolRpcTest.java     | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d19d110/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index cc837bc..4c736e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -195,6 +195,9 @@ public class SlotPoolRpcTest extends TestLogger {
                        pool.start(JobMasterId.generate(), "foobar");
                        SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
 
+                       final CompletableFuture<SlotRequestId> 
slotRequestTimeoutFuture = new CompletableFuture<>();
+                       
pool.setTimeoutPendingSlotRequestConsumer(slotRequestTimeoutFuture::complete);
+
                        ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
                        pool.connectToResourceManager(resourceManagerGateway);
 
@@ -213,6 +216,9 @@ public class SlotPoolRpcTest extends TestLogger {
                                
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
TimeoutException);
                        }
 
+                       // wait until we have timed out the slot request
+                       slotRequestTimeoutFuture.get();
+
                        assertEquals(0L, (long) 
pool.getNumberOfPendingRequests().get());
                } finally {
                        RpcUtils.terminateRpcEndpoint(pool, timeout);
@@ -243,6 +249,9 @@ public class SlotPoolRpcTest extends TestLogger {
                        resourceManagerGateway.setRequestSlotConsumer(
                                (SlotRequest slotRequest) -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
 
+                       final CompletableFuture<SlotRequestId> 
slotRequestTimeoutFuture = new CompletableFuture<>();
+                       
pool.setTimeoutPendingSlotRequestConsumer(slotRequestTimeoutFuture::complete);
+
                        pool.connectToResourceManager(resourceManagerGateway);
 
                        SlotRequestId requestId = new SlotRequestId();
@@ -260,6 +269,9 @@ public class SlotPoolRpcTest extends TestLogger {
                                
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
TimeoutException);
                        }
 
+                       // wait until we have timed out the slot request
+                       slotRequestTimeoutFuture.get();
+
                        assertEquals(0L, (long) 
pool.getNumberOfPendingRequests().get());
 
                        AllocationID allocationId = allocationIdFuture.get();

Reply via email to