Repository: flink
Updated Branches:
  refs/heads/master 775d7fed1 -> 56aefcd9e


[FLINK-6434] cancel slot allocation if request timed out in ProviderAndOwner

Summary:
This fix flink jira #6434
1. Let the ProviderAndOwner generate the allcation id before calling 
allocateSlot to slot pool.
2. If the allocateSlot call timed out, ProviderAndOwner cancel the previos 
allocation to slot pool.

Test Plan: UnitTest

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D323990

This closes #4937.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7481977
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7481977
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7481977

Branch: refs/heads/master
Commit: f7481977cb558c12f43901f9e3bbe4b10f23b0b4
Parents: 775d7fe
Author: shuai.xus <shuai....@alibaba-inc.com>
Authored: Fri Oct 20 17:12:39 2017 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Nov 10 11:23:22 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java |  63 +++++++-
 .../flink/runtime/instance/SlotPoolGateway.java |  20 +++
 .../flink/runtime/instance/SlotPoolRpcTest.java | 147 ++++++++++++++++++-
 .../flink/runtime/instance/SlotPoolTest.java    |  21 +--
 4 files changed, 233 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7481977/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 12dbc63..b033319 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -263,12 +264,13 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
 
        @Override
        public CompletableFuture<SimpleSlot> allocateSlot(
+                       AllocationID allocationID,
                        ScheduledUnit task,
                        ResourceProfile resources,
                        Iterable<TaskManagerLocation> locationPreferences,
                        Time timeout) {
 
-               return internalAllocateSlot(task, resources, 
locationPreferences);
+               return internalAllocateSlot(allocationID, task, resources, 
locationPreferences);
        }
 
        @Override
@@ -276,8 +278,28 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
                internalReturnAllocatedSlot(slot);
        }
 
+       @Override
+       public void cancelSlotAllocation(AllocationID allocationID) {
+               if (waitingForResourceManager.remove(allocationID) == null) {
+
+                       PendingRequest request = 
pendingRequests.remove(allocationID);
+                       if (request != null) {
+                               failPendingRequest(request, new 
CancellationException("Allocation " + allocationID + " cancelled"));
+                       } else {
+
+                               Slot slot = allocatedSlots.get(allocationID);
+                               if (slot != null) {
+                                       LOG.info("Return allocated slot {} by 
cancelling allocation {}.", slot, allocationID);
+                                       if (slot.markCancelled()) {
+                                               
internalReturnAllocatedSlot(slot);
+                                       }
+                               }
+                       }
+               }
+       }
 
        CompletableFuture<SimpleSlot> internalAllocateSlot(
+                       AllocationID allocationID,
                        ScheduledUnit task,
                        ResourceProfile resources,
                        Iterable<TaskManagerLocation> locationPreferences) {
@@ -291,7 +313,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
                }
 
                // the request will be completed by a future
-               final AllocationID allocationID = new AllocationID();
                final CompletableFuture<SimpleSlot> future = new 
CompletableFuture<>();
 
                // (2) need to request a slot
@@ -369,8 +390,14 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
 
        private void checkTimeoutSlotAllocation(AllocationID allocationID) {
                PendingRequest request = pendingRequests.remove(allocationID);
-               if (request != null && !request.getFuture().isDone()) {
-                       request.getFuture().completeExceptionally(new 
TimeoutException("Slot allocation request timed out"));
+               if (request != null) {
+                       failPendingRequest(request, new TimeoutException("Slot 
allocation request " + allocationID + " timed out"));
+               }
+       }
+
+       private void failPendingRequest(PendingRequest pendingRequest, 
Exception e) {
+               if (!pendingRequest.getFuture().isDone()) {
+                       pendingRequest.getFuture().completeExceptionally(e);
                }
        }
 
@@ -394,8 +421,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
 
        private void checkTimeoutRequestWaitingForResourceManager(AllocationID 
allocationID) {
                PendingRequest request = 
waitingForResourceManager.remove(allocationID);
-               if (request != null && !request.getFuture().isDone()) {
-                       request.getFuture().completeExceptionally(new 
NoResourceAvailableException(
+               if (request != null) {
+                       failPendingRequest(request, new 
NoResourceAvailableException(
                                        "No slot available and no connection to 
Resource Manager established."));
                }
        }
@@ -652,6 +679,20 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
                return allocatedSlots;
        }
 
+       @VisibleForTesting
+       AvailableSlots getAvailableSlots() {
+               return availableSlots;
+       }
+
+       public CompletableFuture<Integer> 
getNumberOfWaitingForResourceRequests() {
+               return 
CompletableFuture.completedFuture(waitingForResourceManager.size());
+       }
+
+       @Override
+       public CompletableFuture<Integer> getNumberOfPendingRequests() {
+               return 
CompletableFuture.completedFuture(pendingRequests.size());
+       }
+
        // 
------------------------------------------------------------------------
        //  Helper classes
        // 
------------------------------------------------------------------------
@@ -1014,7 +1055,15 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
                                boolean allowQueued,
                                Collection<TaskManagerLocation> 
preferredLocations) {
 
-                       return gateway.allocateSlot(task, 
ResourceProfile.UNKNOWN, preferredLocations, timeout);
+                       final AllocationID allocationID = new AllocationID();
+                       CompletableFuture<SimpleSlot> slotFuture = 
gateway.allocateSlot(allocationID, task, ResourceProfile.UNKNOWN, 
preferredLocations, timeout);
+                       slotFuture.whenComplete(
+                               (SimpleSlot slot, Throwable failure) -> {
+                                       if (failure != null) {
+                                               
gateway.cancelSlotAllocation(allocationID);
+                                       }
+                       });
+                       return slotFuture;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7481977/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index 06c4b12..02d5d38 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -86,10 +87,29 @@ public interface SlotPoolGateway extends RpcGateway {
        // 
------------------------------------------------------------------------
 
        CompletableFuture<SimpleSlot> allocateSlot(
+                       AllocationID allocationID,
                        ScheduledUnit task,
                        ResourceProfile resources,
                        Iterable<TaskManagerLocation> locationPreferences,
                        @RpcTimeout Time timeout);
 
        void returnAllocatedSlot(Slot slot);
+
+       /**
+        * Cancel a slot allocation.
+        * This method should be called when the CompletableFuture returned by 
allocateSlot completed exceptionally.
+        *
+        * @param allocationID the unique id for the previous allocation
+        */
+       void cancelSlotAllocation(AllocationID allocationID);
+
+       // 
------------------------------------------------------------------------
+       //  exposing internal statistic, mainly for testing
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       CompletableFuture<Integer> getNumberOfWaitingForResourceRequests();
+
+       @VisibleForTesting
+       CompletableFuture<Integer> getNumberOfPendingRequests();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7481977/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 9d742e2..b521b75 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -19,18 +19,24 @@
 package org.apache.flink.runtime.instance;
 
 import akka.actor.ActorSystem;
+import akka.pattern.AskTimeoutException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.util.clock.SystemClock;
 import org.apache.flink.util.TestLogger;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -42,6 +48,8 @@ import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
@@ -83,7 +91,7 @@ public class SlotPoolRpcTest extends TestLogger {
                );
                pool.start(JobMasterId.generate(), "foobar");
 
-               CompletableFuture<SimpleSlot> future = 
pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, 
Time.days(1));
+               CompletableFuture<SimpleSlot> future = pool.allocateSlot(new 
AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, 
Time.days(1));
 
                try {
                        future.get(4, TimeUnit.SECONDS);
@@ -99,4 +107,139 @@ public class SlotPoolRpcTest extends TestLogger {
                        fail("wrong exception: " + e);
                }
        }
+
+       @Test
+       public void testCancelSlotAllocationWithoutResourceManager() throws 
Exception {
+               final JobID jid = new JobID();
+
+               final SlotPool pool = new SlotPool(
+                               rpcService, jid,
+                               SystemClock.getInstance(),
+                               Time.days(1), Time.days(1),
+                               Time.seconds(1) // this is the timeout for the 
request tested here
+               );
+               pool.start(JobMasterId.generate(), "foobar");
+               SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+               AllocationID allocationID = new AllocationID();
+               CompletableFuture<SimpleSlot> future = 
slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100));
+
+               try {
+                       future.get(500, TimeUnit.MILLISECONDS);
+                       fail("We expected a AskTimeoutException.");
+               } catch (ExecutionException e) {
+                       assertTrue(e.getCause() instanceof AskTimeoutException);
+               }
+
+               assertEquals(1, 
slotPoolGateway.getNumberOfWaitingForResourceRequests().get().intValue());
+
+               pool.cancelSlotAllocation(allocationID);
+               assertEquals(0, 
slotPoolGateway.getNumberOfWaitingForResourceRequests().get().intValue());
+       }
+
+       @Test
+       public void testCancelSlotAllocationWithResourceManager() throws 
Exception {
+               final JobID jid = new JobID();
+
+               final SlotPool pool = new SlotPool(
+                               rpcService, jid,
+                               SystemClock.getInstance(),
+                               Time.days(1), Time.days(1),
+                               Time.seconds(1) // this is the timeout for the 
request tested here
+               );
+               pool.start(JobMasterId.generate(), "foobar");
+               SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+               ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
+               pool.connectToResourceManager(resourceManagerGateway);
+
+               AllocationID allocationID = new AllocationID();
+               CompletableFuture<SimpleSlot> future = 
slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100));
+
+               try {
+                       future.get(500, TimeUnit.MILLISECONDS);
+                       fail("We expected a AskTimeoutException.");
+               } catch (ExecutionException e) {
+                       assertTrue(e.getCause() instanceof AskTimeoutException);
+               }
+
+               assertEquals(1, 
slotPoolGateway.getNumberOfPendingRequests().get().intValue());
+
+               pool.cancelSlotAllocation(allocationID);
+               assertEquals(0, 
slotPoolGateway.getNumberOfPendingRequests().get().intValue());
+       }
+
+       @Test
+       public void testCancelSlotAllocationWhileSlotFulfilled() throws 
Exception {
+               final JobID jid = new JobID();
+
+               final SlotPool pool = new SlotPool(
+                               rpcService, jid,
+                               SystemClock.getInstance(),
+                               Time.days(1), Time.days(1),
+                               Time.seconds(1) // this is the timeout for the 
request tested here
+               );
+               pool.start(JobMasterId.generate(), "foobar");
+               SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+               ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
+               pool.connectToResourceManager(resourceManagerGateway);
+
+               AllocationID allocationID = new AllocationID();
+               CompletableFuture<SimpleSlot> future = 
slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100));
+
+               try {
+                       future.get(500, TimeUnit.MILLISECONDS);
+                       fail("We expected a AskTimeoutException.");
+               }
+               catch (ExecutionException e) {
+                       assertTrue(e.getCause() instanceof AskTimeoutException);
+               }
+
+               ResourceID resourceID = ResourceID.generate();
+               AllocatedSlot allocatedSlot = 
SlotPoolTest.createAllocatedSlot(resourceID, allocationID, jid, 
DEFAULT_TESTING_PROFILE);
+               slotPoolGateway.registerTaskManager(resourceID);
+               assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+
+               assertEquals(0, 
slotPoolGateway.getNumberOfPendingRequests().get().intValue());
+               assertTrue(pool.getAllocatedSlots().contains(allocationID));
+
+               pool.cancelSlotAllocation(allocationID);
+               assertFalse(pool.getAllocatedSlots().contains(allocationID));
+               assertTrue(pool.getAvailableSlots().contains(allocationID));
+       }
+
+       /**
+        * This case make sure when allocateSlot in ProviderAndOwner timeout,
+        * it will automatically call cancelSlotAllocation as will inject 
future.whenComplete in ProviderAndOwner.
+        */
+       @Test
+       public void testProviderAndOwner() throws Exception {
+               final JobID jid = new JobID();
+
+               final SlotPool pool = new SlotPool(
+                               rpcService, jid,
+                               SystemClock.getInstance(),
+                               Time.milliseconds(100), Time.days(1),
+                               Time.seconds(1) // this is the timeout for the 
request tested here
+               );
+               pool.start(JobMasterId.generate(), "foobar");
+               ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
+               pool.connectToResourceManager(resourceManagerGateway);
+
+               ScheduledUnit mockScheduledUnit = new 
ScheduledUnit(SchedulerTestUtils.getDummyTask());
+
+               // test the pending request is clear when timed out
+               CompletableFuture<SimpleSlot> future = 
pool.getSlotProvider().allocateSlot(mockScheduledUnit, true, null);
+
+               try {
+                       future.get(500, TimeUnit.MILLISECONDS);
+                       fail("We expected a AskTimeoutException.");
+               }
+               catch (ExecutionException e) {
+                       assertTrue(e.getCause() instanceof AskTimeoutException);
+               }
+
+               assertEquals(0, 
pool.getSelfGateway(SlotPoolGateway.class).getNumberOfPendingRequests().get().intValue());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7481977/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index f38894e..bee77e0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -95,8 +95,8 @@ public class SlotPoolTest extends TestLogger {
                        ResourceID resourceID = new ResourceID("resource");
                        slotPoolGateway.registerTaskManager(resourceID);
 
-                       ScheduledUnit task = mock(ScheduledUnit.class);
-                       CompletableFuture<SimpleSlot> future = 
slotPoolGateway.allocateSlot(task, DEFAULT_TESTING_PROFILE, null, timeout);
+                       AllocationID allocationID = new AllocationID();
+                       CompletableFuture<SimpleSlot> future = 
slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
                        assertFalse(future.isDone());
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
@@ -104,6 +104,8 @@ public class SlotPoolTest extends TestLogger {
 
                        final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
+                       assertEquals(allocationID, 
slotRequest.getAllocationId());
+
                        AllocatedSlot allocatedSlot = 
createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, 
DEFAULT_TESTING_PROFILE);
                        
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
 
@@ -129,8 +131,8 @@ public class SlotPoolTest extends TestLogger {
                        ResourceID resourceID = new ResourceID("resource");
                        slotPool.registerTaskManager(resourceID);
 
-                       CompletableFuture<SimpleSlot> future1 = 
slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
-                       CompletableFuture<SimpleSlot> future2 = 
slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
+                       CompletableFuture<SimpleSlot> future1 = 
slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
+                       CompletableFuture<SimpleSlot> future2 = 
slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
 
                        assertFalse(future1.isDone());
                        assertFalse(future2.isDone());
@@ -176,7 +178,7 @@ public class SlotPoolTest extends TestLogger {
                        ResourceID resourceID = new ResourceID("resource");
                        slotPoolGateway.registerTaskManager(resourceID);
 
-                       CompletableFuture<SimpleSlot> future1 = 
slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
+                       CompletableFuture<SimpleSlot> future1 = 
slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
                        assertFalse(future1.isDone());
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
@@ -193,7 +195,7 @@ public class SlotPoolTest extends TestLogger {
                        // return this slot to pool
                        slot1.releaseSlot();
 
-                       CompletableFuture<SimpleSlot> future2 = 
slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
+                       CompletableFuture<SimpleSlot> future2 = 
slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
 
                        // second allocation fulfilled by previous slot 
returning
                        SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
@@ -219,7 +221,7 @@ public class SlotPoolTest extends TestLogger {
                        ResourceID resourceID = new ResourceID("resource");
                        slotPoolGateway.registerTaskManager(resourceID);
 
-                       CompletableFuture<SimpleSlot> future = 
slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
+                       CompletableFuture<SimpleSlot> future = 
slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
                        assertFalse(future.isDone());
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
@@ -275,14 +277,14 @@ public class SlotPoolTest extends TestLogger {
                        ResourceID resourceID = new ResourceID("resource");
                        slotPoolGateway.registerTaskManager(resourceID);
 
-                       CompletableFuture<SimpleSlot> future1 = 
slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
+                       CompletableFuture<SimpleSlot> future1 = 
slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
                        verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
-                       CompletableFuture<SimpleSlot> future2 = 
slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
+                       CompletableFuture<SimpleSlot> future2 = 
slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
 
                        AllocatedSlot allocatedSlot = 
createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, 
DEFAULT_TESTING_PROFILE);
                        
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
@@ -333,6 +335,7 @@ public class SlotPoolTest extends TestLogger {
                        
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
 
                        CompletableFuture<SimpleSlot> slotFuture = 
slotPoolGateway.allocateSlot(
+                               new AllocationID(),
                                scheduledUnit,
                                ResourceProfile.UNKNOWN,
                                Collections.emptyList(),

Reply via email to