Repository: flink
Updated Branches:
  refs/heads/master 432e48a2e -> 89cfeaa88


[FLINK-9456] Reuse TestingResourceActions in 
SlotManagerTest#testNotifyFailedAllocationWhenTaskManagerTerminated


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89cfeaa8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89cfeaa8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89cfeaa8

Branch: refs/heads/master
Commit: 89cfeaa882f9e68df2bd215563622b48c29a9ec9
Parents: 50c0ea8
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sun Jul 1 21:08:41 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sun Jul 1 21:10:04 2018 +0200

----------------------------------------------------------------------
 .../clusterframework/types/TaskManagerSlot.java |   1 +
 .../slotmanager/SlotManager.java                |  15 ++-
 .../exceptions/SlotOccupiedException.java       |  18 +--
 .../slotmanager/SlotManagerTest.java            | 113 +++++++++++--------
 .../slotmanager/TestingResourceActions.java     |  32 +++++-
 .../TestingResourceActionsBuilder.java          |  56 +++++++++
 6 files changed, 167 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
index be39424..633f31a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
@@ -90,6 +90,7 @@ public class TaskManagerSlot {
                return allocationId;
        }
 
+       @Nullable
        public JobID getJobId() {
                return jobId;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d0d03f5..b2dbba8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -541,6 +541,7 @@ public class SlotManager implements AutoCloseable {
         *
         * @param slotId to update
         * @param allocationId specifying the current allocation of the slot
+        * @param jobId specifying the job to which the slot is allocated
         * @return True if the slot could be updated; otherwise false
         */
        private boolean updateSlot(SlotID slotId, AllocationID allocationId, 
JobID jobId) {
@@ -565,10 +566,10 @@ public class SlotManager implements AutoCloseable {
        }
 
        private void updateSlotState(
-               TaskManagerSlot slot,
-               TaskManagerRegistration taskManagerRegistration,
-               @Nullable AllocationID allocationId,
-               @Nullable JobID jobId) {
+                       TaskManagerSlot slot,
+                       TaskManagerRegistration taskManagerRegistration,
+                       @Nullable AllocationID allocationId,
+                       @Nullable JobID jobId) {
                if (null != allocationId) {
                        switch (slot.getState()) {
                                case PENDING:
@@ -773,10 +774,14 @@ public class SlotManager implements AutoCloseable {
                        }
 
                        AllocationID oldAllocationId = slot.getAllocationId();
+
                        if (oldAllocationId != null) {
                                fulfilledSlotRequests.remove(oldAllocationId);
+
                                resourceActions.notifyAllocationFailure(
-                                       slot.getJobId(), oldAllocationId, new 
Exception("The assigned slot " + slot.getSlotId() + " was removed."));
+                                       slot.getJobId(),
+                                       oldAllocationId,
+                                       new FlinkException("The assigned slot " 
+ slot.getSlotId() + " was removed."));
                        }
                } else {
                        LOG.debug("There was no slot registered with slot id 
{}.", slotId);

http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
index 818754c..cb528de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
@@ -22,6 +22,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.util.Preconditions;
 
+/**
+ * Exception which signals that a slot is already occupied by the given
+ * {@link AllocationID}.
+ */
 public class SlotOccupiedException extends SlotAllocationException {
        private static final long serialVersionUID = -3986333914244338888L;
 
@@ -32,19 +36,7 @@ public class SlotOccupiedException extends 
SlotAllocationException {
        public SlotOccupiedException(String message, AllocationID allocationId, 
JobID jobId) {
                super(message);
                this.allocationId = Preconditions.checkNotNull(allocationId);
-               this.jobId = jobId;
-       }
-
-       public SlotOccupiedException(String message, Throwable cause, 
AllocationID allocationId, JobID jobId) {
-               super(message, cause);
-               this.allocationId = Preconditions.checkNotNull(allocationId);
-               this.jobId = jobId;
-       }
-
-       public SlotOccupiedException(Throwable cause, AllocationID 
allocationId, JobID jobId) {
-               super(cause);
-               this.allocationId = Preconditions.checkNotNull(allocationId);
-               this.jobId = jobId;
+               this.jobId = Preconditions.checkNotNull(jobId);
        }
 
        public AllocationID getAllocationId() {

http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 1b072d7..fb82aa5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -49,11 +50,14 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import java.util.ArrayList;
+import javax.annotation.Nonnull;
+
+import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -64,9 +68,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1161,7 +1166,8 @@ public class SlotManagerTest extends TestLogger {
         */
        @Test
        public void testSlotRequestFailure() throws Exception {
-               try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), new TestingResourceActions())) {
+               try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(),
+                       new 
TestingResourceActionsBuilder().createTestingResourceActions())) {
 
                        final SlotRequest slotRequest = new SlotRequest(new 
JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
                        slotManager.registerSlotRequest(slotRequest);
@@ -1216,93 +1222,106 @@ public class SlotManagerTest extends TestLogger {
        @Test
        public void testNotifyFailedAllocationWhenTaskManagerTerminated() 
throws Exception {
 
-               final List<Tuple2<JobID, AllocationID>> 
notifiedTaskManagerInfos = new ArrayList<>();
+               final Queue<Tuple2<JobID, AllocationID>> allocationFailures = 
new ArrayDeque<>(5);
 
-               try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
-                               @Override
-                               public void notifyAllocationFailure(JobID 
jobId, AllocationID allocationId, Exception cause) {
-                                       notifiedTaskManagerInfos.add(new 
Tuple2<>(jobId, allocationId));
-                               }})) {
+               final TestingResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setNotifyAllocationFailureConsumer(
+                               (Tuple3<JobID, AllocationID, Exception> 
failureMessage) ->
+                                       
allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1)))
+                       .createTestingResourceActions();
+
+               try (final SlotManager slotManager = createSlotManager(
+                       ResourceManagerId.generate(),
+                       resourceManagerActions)) {
 
                        // register slot request for job1.
                        JobID jobId1 = new JobID();
-                       final SlotRequest slotRequest11 = new 
SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
-                       final SlotRequest slotRequest12 = new 
SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+                       final SlotRequest slotRequest11 = 
createSlotRequest(jobId1);
+                       final SlotRequest slotRequest12 = 
createSlotRequest(jobId1);
                        slotManager.registerSlotRequest(slotRequest11);
                        slotManager.registerSlotRequest(slotRequest12);
 
                        // create task-manager-1 with 2 slots.
                        final ResourceID taskExecutorResourceId1 = 
ResourceID.generate();
-                       final TestingTaskExecutorGateway 
testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder()
-                               .createTestingTaskExecutorGateway();
+                       final TestingTaskExecutorGateway 
testingTaskExecutorGateway1 = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
                        final TaskExecutorConnection taskExecutionConnection1 = 
new TaskExecutorConnection(taskExecutorResourceId1, 
testingTaskExecutorGateway1);
-                       final Set<SlotStatus> tm1SlotStatusList = new 
HashSet<>();
-                       tm1SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId1, 0), ResourceProfile.UNKNOWN));
-                       tm1SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId1, 1), ResourceProfile.UNKNOWN));
+                       final SlotReport slotReport1 = 
createSlotReport(taskExecutorResourceId1, 2);
 
                        // register the task-manager-1 to the slot manager, 
this will trigger the slot allocation for job1.
-                       
slotManager.registerTaskManager(taskExecutionConnection1, new 
SlotReport(tm1SlotStatusList));
+                       
slotManager.registerTaskManager(taskExecutionConnection1, slotReport1);
 
                        // register slot request for job2.
                        JobID jobId2 = new JobID();
-                       final SlotRequest slotRequest21 = new 
SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
-                       final SlotRequest slotRequest22 = new 
SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
+                       final SlotRequest slotRequest21 = 
createSlotRequest(jobId2);
+                       final SlotRequest slotRequest22 = 
createSlotRequest(jobId2);
                        slotManager.registerSlotRequest(slotRequest21);
                        slotManager.registerSlotRequest(slotRequest22);
 
                        // register slot request for job3.
                        JobID jobId3 = new JobID();
-                       final SlotRequest slotRequest31 = new 
SlotRequest(jobId3, new AllocationID(), ResourceProfile.UNKNOWN, "foobar3");
+                       final SlotRequest slotRequest31 = 
createSlotRequest(jobId3);
                        slotManager.registerSlotRequest(slotRequest31);
 
                        // create task-manager-2 with 3 slots.
                        final ResourceID taskExecutorResourceId2 = 
ResourceID.generate();
-                       final TestingTaskExecutorGateway 
testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder()
-                               .createTestingTaskExecutorGateway();
+                       final TestingTaskExecutorGateway 
testingTaskExecutorGateway2 = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
                        final TaskExecutorConnection taskExecutionConnection2 = 
new TaskExecutorConnection(taskExecutorResourceId2, 
testingTaskExecutorGateway2);
-                       final Set<SlotStatus> tm2SlotStatusList = new 
HashSet<>();
-                       tm2SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId2, 0), ResourceProfile.UNKNOWN));
-                       tm2SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId2, 1), ResourceProfile.UNKNOWN));
-                       tm2SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId2, 2), ResourceProfile.UNKNOWN));
-                       tm2SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId2, 3), ResourceProfile.UNKNOWN));
+                       final SlotReport slotReport2 = 
createSlotReport(taskExecutorResourceId2, 3);
 
                        // register the task-manager-2 to the slot manager, 
this will trigger the slot allocation for job2 and job3.
-                       
slotManager.registerTaskManager(taskExecutionConnection2, new 
SlotReport(tm2SlotStatusList));
-
-                       // --------------------- valid the notify task manager 
terminated ------------------------
+                       
slotManager.registerTaskManager(taskExecutionConnection2, slotReport2);
 
-                       // valid for job1.
+                       // validate for job1.
                        
slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID());
 
-                       assertEquals(2, notifiedTaskManagerInfos.size());
+                       assertThat(allocationFailures, hasSize(2));
 
-                       assertThat(jobId1, 
equalTo(notifiedTaskManagerInfos.get(0).f0));
-                       assertThat(jobId1, 
equalTo(notifiedTaskManagerInfos.get(1).f0));
+                       Tuple2<JobID, AllocationID> allocationFailure;
+                       final Set<AllocationID> failedAllocations = new 
HashSet<>(2);
 
-                       assertEquals(Stream.of(slotRequest11.getAllocationId(), 
slotRequest12.getAllocationId()).collect(Collectors.toSet()),
-                               Stream.of(notifiedTaskManagerInfos.get(0).f1, 
notifiedTaskManagerInfos.get(1).f1).collect(Collectors.toSet()));
+                       while ((allocationFailure = allocationFailures.poll()) 
!= null) {
+                               assertThat(allocationFailure.f0, 
equalTo(jobId1));
+                               failedAllocations.add(allocationFailure.f1);
+                       }
 
-                       notifiedTaskManagerInfos.clear();
+                       assertThat(failedAllocations, 
containsInAnyOrder(slotRequest11.getAllocationId(), 
slotRequest12.getAllocationId()));
 
-                       // valid the result for job2 and job3.
+                       // validate the result for job2 and job3.
                        
slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID());
 
-                       assertEquals(3, notifiedTaskManagerInfos.size());
+                       assertThat(allocationFailures, hasSize(3));
 
-                       Map<JobID, List<Tuple2<JobID, AllocationID>>> 
job2AndJob3FailedAllocationInfo = 
notifiedTaskManagerInfos.stream().collect(Collectors.groupingBy(tuple -> 
tuple.f0));
+                       Map<JobID, List<Tuple2<JobID, AllocationID>>> 
job2AndJob3FailedAllocationInfo = 
allocationFailures.stream().collect(Collectors.groupingBy(tuple -> tuple.f0));
 
-                       assertEquals(2, job2AndJob3FailedAllocationInfo.size());
+                       assertThat(job2AndJob3FailedAllocationInfo.entrySet(), 
hasSize(2));
 
-                       // valid for job2
-                       assertEquals(Stream.of(slotRequest21.getAllocationId(), 
slotRequest22.getAllocationId()).collect(Collectors.toSet()),
-                               
job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(tuple2 -> 
tuple2.f1).collect(Collectors.toSet()));
+                       final Set<AllocationID> job2FailedAllocations = 
extractFailedAllocationsForJob(jobId2, job2AndJob3FailedAllocationInfo);
+                       final Set<AllocationID> job3FailedAllocations = 
extractFailedAllocationsForJob(jobId3, job2AndJob3FailedAllocationInfo);
 
-                       // valid for job3
-                       
assertEquals(Stream.of(slotRequest31.getAllocationId()).collect(Collectors.toSet()),
-                               
job2AndJob3FailedAllocationInfo.get(jobId3).stream().map(tuple2 -> 
tuple2.f1).collect(Collectors.toSet()));
+                       assertThat(job2FailedAllocations, 
containsInAnyOrder(slotRequest21.getAllocationId(), 
slotRequest22.getAllocationId()));
+                       assertThat(job3FailedAllocations, 
containsInAnyOrder(slotRequest31.getAllocationId()));
                }
        }
 
+       private Set<AllocationID> extractFailedAllocationsForJob(JobID jobId2, 
Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo) {
+               return 
job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(t -> 
t.f1).collect(Collectors.toSet());
+       }
+
+       @Nonnull
+       private SlotReport createSlotReport(ResourceID taskExecutorResourceId, 
int numberSlots) {
+               final Set<SlotStatus> slotStatusSet = new 
HashSet<>(numberSlots);
+               for (int i = 0; i < numberSlots; i++) {
+                       slotStatusSet.add(new SlotStatus(new 
SlotID(taskExecutorResourceId, i), ResourceProfile.UNKNOWN));
+               }
+
+               return new SlotReport(slotStatusSet);
+       }
+
+       @Nonnull
+       private SlotRequest createSlotRequest(JobID jobId1) {
+               return new SlotRequest(jobId1, new AllocationID(), 
ResourceProfile.UNKNOWN, "foobar1");
+       }
+
        private SlotManager createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions) {
                SlotManager slotManager = new SlotManager(
                        TestingUtils.defaultScheduledExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
index 915f142..8b7c802 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
@@ -19,26 +19,52 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
 
+import javax.annotation.Nonnull;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
 /**
  * Testing implementation of the {@link ResourceActions}.
  */
 public class TestingResourceActions implements ResourceActions {
+
+       @Nonnull
+       private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
+
+       @Nonnull
+       private final Consumer<ResourceProfile> allocateResourceConsumer;
+
+       @Nonnull
+       private final Consumer<Tuple3<JobID, AllocationID, Exception>> 
notifyAllocationFailureConsumer;
+
+       public TestingResourceActions(
+                       @Nonnull BiConsumer<InstanceID, Exception> 
releaseResourceConsumer,
+                       @Nonnull Consumer<ResourceProfile> 
allocateResourceConsumer,
+                       @Nonnull Consumer<Tuple3<JobID, AllocationID, 
Exception>> notifyAllocationFailureConsumer) {
+               this.releaseResourceConsumer = releaseResourceConsumer;
+               this.allocateResourceConsumer = allocateResourceConsumer;
+               this.notifyAllocationFailureConsumer = 
notifyAllocationFailureConsumer;
+       }
+
+
        @Override
        public void releaseResource(InstanceID instanceId, Exception cause) {
-
+               releaseResourceConsumer.accept(instanceId, cause);
        }
 
        @Override
        public void allocateResource(ResourceProfile resourceProfile) {
-
+               allocateResourceConsumer.accept(resourceProfile);
        }
 
        @Override
        public void notifyAllocationFailure(JobID jobId, AllocationID 
allocationId, Exception cause) {
-
+               notifyAllocationFailureConsumer.accept(Tuple3.of(jobId, 
allocationId, cause));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
new file mode 100644
index 0000000..2c1d47e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Builder for the {@link TestingResourceActions}.
+ */
+public class TestingResourceActionsBuilder {
+       private BiConsumer<InstanceID, Exception> releaseResourceConsumer = 
(ignoredA, ignoredB) -> {};
+       private Consumer<ResourceProfile> allocateResourceConsumer = (ignored) 
-> {};
+       private Consumer<Tuple3<JobID, AllocationID, Exception>> 
notifyAllocationFailureConsumer = (ignored) -> {};
+
+       public TestingResourceActionsBuilder 
setReleaseResourceConsumer(BiConsumer<InstanceID, Exception> 
releaseResourceConsumer) {
+               this.releaseResourceConsumer = releaseResourceConsumer;
+               return this;
+       }
+
+       public TestingResourceActionsBuilder 
setAllocateResourceConsumer(Consumer<ResourceProfile> allocateResourceConsumer) 
{
+               this.allocateResourceConsumer = allocateResourceConsumer;
+               return this;
+       }
+
+       public TestingResourceActionsBuilder 
setNotifyAllocationFailureConsumer(Consumer<Tuple3<JobID, AllocationID, 
Exception>> notifyAllocationFailureConsumer) {
+               this.notifyAllocationFailureConsumer = 
notifyAllocationFailureConsumer;
+               return this;
+       }
+
+       public TestingResourceActions createTestingResourceActions() {
+               return new TestingResourceActions(releaseResourceConsumer, 
allocateResourceConsumer, notifyAllocationFailureConsumer);
+       }
+}

Reply via email to