Repository: flink Updated Branches: refs/heads/master 432e48a2e -> 89cfeaa88
[FLINK-9456] Reuse TestingResourceActions in SlotManagerTest#testNotifyFailedAllocationWhenTaskManagerTerminated Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89cfeaa8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89cfeaa8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89cfeaa8 Branch: refs/heads/master Commit: 89cfeaa882f9e68df2bd215563622b48c29a9ec9 Parents: 50c0ea8 Author: Till Rohrmann <trohrm...@apache.org> Authored: Sun Jul 1 21:08:41 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sun Jul 1 21:10:04 2018 +0200 ---------------------------------------------------------------------- .../clusterframework/types/TaskManagerSlot.java | 1 + .../slotmanager/SlotManager.java | 15 ++- .../exceptions/SlotOccupiedException.java | 18 +-- .../slotmanager/SlotManagerTest.java | 113 +++++++++++-------- .../slotmanager/TestingResourceActions.java | 32 +++++- .../TestingResourceActionsBuilder.java | 56 +++++++++ 6 files changed, 167 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java index be39424..633f31a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java @@ -90,6 +90,7 @@ public class TaskManagerSlot { return allocationId; } + @Nullable public JobID getJobId() { return jobId; } http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index d0d03f5..b2dbba8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -541,6 +541,7 @@ public class SlotManager implements AutoCloseable { * * @param slotId to update * @param allocationId specifying the current allocation of the slot + * @param jobId specifying the job to which the slot is allocated * @return True if the slot could be updated; otherwise false */ private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId) { @@ -565,10 +566,10 @@ public class SlotManager implements AutoCloseable { } private void updateSlotState( - TaskManagerSlot slot, - TaskManagerRegistration taskManagerRegistration, - @Nullable AllocationID allocationId, - @Nullable JobID jobId) { + TaskManagerSlot slot, + TaskManagerRegistration taskManagerRegistration, + @Nullable AllocationID allocationId, + @Nullable JobID jobId) { if (null != allocationId) { switch (slot.getState()) { case PENDING: @@ -773,10 +774,14 @@ public class SlotManager implements AutoCloseable { } AllocationID oldAllocationId = slot.getAllocationId(); + if (oldAllocationId != null) { fulfilledSlotRequests.remove(oldAllocationId); + resourceActions.notifyAllocationFailure( - slot.getJobId(), oldAllocationId, new Exception("The assigned slot " + slot.getSlotId() + " was removed.")); + slot.getJobId(), + oldAllocationId, + new FlinkException("The assigned slot " + slot.getSlotId() + " was removed.")); } } else { LOG.debug("There was no slot registered with slot id {}.", slotId); http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java index 818754c..cb528de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java @@ -22,6 +22,10 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.util.Preconditions; +/** + * Exception which signals that a slot is already occupied by the given + * {@link AllocationID}. + */ public class SlotOccupiedException extends SlotAllocationException { private static final long serialVersionUID = -3986333914244338888L; @@ -32,19 +36,7 @@ public class SlotOccupiedException extends SlotAllocationException { public SlotOccupiedException(String message, AllocationID allocationId, JobID jobId) { super(message); this.allocationId = Preconditions.checkNotNull(allocationId); - this.jobId = jobId; - } - - public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId, JobID jobId) { - super(message, cause); - this.allocationId = Preconditions.checkNotNull(allocationId); - this.jobId = jobId; - } - - public SlotOccupiedException(Throwable cause, AllocationID allocationId, JobID jobId) { - super(cause); - this.allocationId = Preconditions.checkNotNull(allocationId); - this.jobId = jobId; + this.jobId = Preconditions.checkNotNull(jobId); } public AllocationID getAllocationId() { http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 1b072d7..fb82aa5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -49,11 +50,14 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; import org.mockito.ArgumentCaptor; -import java.util.ArrayList; +import javax.annotation.Nonnull; + +import java.util.ArrayDeque; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -64,9 +68,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import java.util.stream.Stream; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1161,7 +1166,8 @@ public class SlotManagerTest extends TestLogger { */ @Test public void testSlotRequestFailure() throws Exception { - try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions())) { + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), + new TestingResourceActionsBuilder().createTestingResourceActions())) { final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar"); slotManager.registerSlotRequest(slotRequest); @@ -1216,93 +1222,106 @@ public class SlotManagerTest extends TestLogger { @Test public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Exception { - final List<Tuple2<JobID, AllocationID>> notifiedTaskManagerInfos = new ArrayList<>(); + final Queue<Tuple2<JobID, AllocationID>> allocationFailures = new ArrayDeque<>(5); - try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() { - @Override - public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) { - notifiedTaskManagerInfos.add(new Tuple2<>(jobId, allocationId)); - }})) { + final TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setNotifyAllocationFailureConsumer( + (Tuple3<JobID, AllocationID, Exception> failureMessage) -> + allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1))) + .createTestingResourceActions(); + + try (final SlotManager slotManager = createSlotManager( + ResourceManagerId.generate(), + resourceManagerActions)) { // register slot request for job1. JobID jobId1 = new JobID(); - final SlotRequest slotRequest11 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1"); - final SlotRequest slotRequest12 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1"); + final SlotRequest slotRequest11 = createSlotRequest(jobId1); + final SlotRequest slotRequest12 = createSlotRequest(jobId1); slotManager.registerSlotRequest(slotRequest11); slotManager.registerSlotRequest(slotRequest12); // create task-manager-1 with 2 slots. final ResourceID taskExecutorResourceId1 = ResourceID.generate(); - final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder() - .createTestingTaskExecutorGateway(); + final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutionConnection1 = new TaskExecutorConnection(taskExecutorResourceId1, testingTaskExecutorGateway1); - final Set<SlotStatus> tm1SlotStatusList = new HashSet<>(); - tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 0), ResourceProfile.UNKNOWN)); - tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 1), ResourceProfile.UNKNOWN)); + final SlotReport slotReport1 = createSlotReport(taskExecutorResourceId1, 2); // register the task-manager-1 to the slot manager, this will trigger the slot allocation for job1. - slotManager.registerTaskManager(taskExecutionConnection1, new SlotReport(tm1SlotStatusList)); + slotManager.registerTaskManager(taskExecutionConnection1, slotReport1); // register slot request for job2. JobID jobId2 = new JobID(); - final SlotRequest slotRequest21 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2"); - final SlotRequest slotRequest22 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2"); + final SlotRequest slotRequest21 = createSlotRequest(jobId2); + final SlotRequest slotRequest22 = createSlotRequest(jobId2); slotManager.registerSlotRequest(slotRequest21); slotManager.registerSlotRequest(slotRequest22); // register slot request for job3. JobID jobId3 = new JobID(); - final SlotRequest slotRequest31 = new SlotRequest(jobId3, new AllocationID(), ResourceProfile.UNKNOWN, "foobar3"); + final SlotRequest slotRequest31 = createSlotRequest(jobId3); slotManager.registerSlotRequest(slotRequest31); // create task-manager-2 with 3 slots. final ResourceID taskExecutorResourceId2 = ResourceID.generate(); - final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder() - .createTestingTaskExecutorGateway(); + final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutionConnection2 = new TaskExecutorConnection(taskExecutorResourceId2, testingTaskExecutorGateway2); - final Set<SlotStatus> tm2SlotStatusList = new HashSet<>(); - tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 0), ResourceProfile.UNKNOWN)); - tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 1), ResourceProfile.UNKNOWN)); - tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 2), ResourceProfile.UNKNOWN)); - tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 3), ResourceProfile.UNKNOWN)); + final SlotReport slotReport2 = createSlotReport(taskExecutorResourceId2, 3); // register the task-manager-2 to the slot manager, this will trigger the slot allocation for job2 and job3. - slotManager.registerTaskManager(taskExecutionConnection2, new SlotReport(tm2SlotStatusList)); - - // --------------------- valid the notify task manager terminated ------------------------ + slotManager.registerTaskManager(taskExecutionConnection2, slotReport2); - // valid for job1. + // validate for job1. slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID()); - assertEquals(2, notifiedTaskManagerInfos.size()); + assertThat(allocationFailures, hasSize(2)); - assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(0).f0)); - assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(1).f0)); + Tuple2<JobID, AllocationID> allocationFailure; + final Set<AllocationID> failedAllocations = new HashSet<>(2); - assertEquals(Stream.of(slotRequest11.getAllocationId(), slotRequest12.getAllocationId()).collect(Collectors.toSet()), - Stream.of(notifiedTaskManagerInfos.get(0).f1, notifiedTaskManagerInfos.get(1).f1).collect(Collectors.toSet())); + while ((allocationFailure = allocationFailures.poll()) != null) { + assertThat(allocationFailure.f0, equalTo(jobId1)); + failedAllocations.add(allocationFailure.f1); + } - notifiedTaskManagerInfos.clear(); + assertThat(failedAllocations, containsInAnyOrder(slotRequest11.getAllocationId(), slotRequest12.getAllocationId())); - // valid the result for job2 and job3. + // validate the result for job2 and job3. slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID()); - assertEquals(3, notifiedTaskManagerInfos.size()); + assertThat(allocationFailures, hasSize(3)); - Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = notifiedTaskManagerInfos.stream().collect(Collectors.groupingBy(tuple -> tuple.f0)); + Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = allocationFailures.stream().collect(Collectors.groupingBy(tuple -> tuple.f0)); - assertEquals(2, job2AndJob3FailedAllocationInfo.size()); + assertThat(job2AndJob3FailedAllocationInfo.entrySet(), hasSize(2)); - // valid for job2 - assertEquals(Stream.of(slotRequest21.getAllocationId(), slotRequest22.getAllocationId()).collect(Collectors.toSet()), - job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet())); + final Set<AllocationID> job2FailedAllocations = extractFailedAllocationsForJob(jobId2, job2AndJob3FailedAllocationInfo); + final Set<AllocationID> job3FailedAllocations = extractFailedAllocationsForJob(jobId3, job2AndJob3FailedAllocationInfo); - // valid for job3 - assertEquals(Stream.of(slotRequest31.getAllocationId()).collect(Collectors.toSet()), - job2AndJob3FailedAllocationInfo.get(jobId3).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet())); + assertThat(job2FailedAllocations, containsInAnyOrder(slotRequest21.getAllocationId(), slotRequest22.getAllocationId())); + assertThat(job3FailedAllocations, containsInAnyOrder(slotRequest31.getAllocationId())); } } + private Set<AllocationID> extractFailedAllocationsForJob(JobID jobId2, Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo) { + return job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(t -> t.f1).collect(Collectors.toSet()); + } + + @Nonnull + private SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) { + final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots); + for (int i = 0; i < numberSlots; i++) { + slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), ResourceProfile.UNKNOWN)); + } + + return new SlotReport(slotStatusSet); + } + + @Nonnull + private SlotRequest createSlotRequest(JobID jobId1) { + return new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1"); + } + private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) { SlotManager slotManager = new SlotManager( TestingUtils.defaultScheduledExecutor(), http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java index 915f142..8b7c802 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java @@ -19,26 +19,52 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.InstanceID; +import javax.annotation.Nonnull; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + /** * Testing implementation of the {@link ResourceActions}. */ public class TestingResourceActions implements ResourceActions { + + @Nonnull + private final BiConsumer<InstanceID, Exception> releaseResourceConsumer; + + @Nonnull + private final Consumer<ResourceProfile> allocateResourceConsumer; + + @Nonnull + private final Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer; + + public TestingResourceActions( + @Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer, + @Nonnull Consumer<ResourceProfile> allocateResourceConsumer, + @Nonnull Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer) { + this.releaseResourceConsumer = releaseResourceConsumer; + this.allocateResourceConsumer = allocateResourceConsumer; + this.notifyAllocationFailureConsumer = notifyAllocationFailureConsumer; + } + + @Override public void releaseResource(InstanceID instanceId, Exception cause) { - + releaseResourceConsumer.accept(instanceId, cause); } @Override public void allocateResource(ResourceProfile resourceProfile) { - + allocateResourceConsumer.accept(resourceProfile); } @Override public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) { - + notifyAllocationFailureConsumer.accept(Tuple3.of(jobId, allocationId, cause)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java new file mode 100644 index 0000000..2c1d47e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.instance.InstanceID; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +/** + * Builder for the {@link TestingResourceActions}. + */ +public class TestingResourceActionsBuilder { + private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {}; + private Consumer<ResourceProfile> allocateResourceConsumer = (ignored) -> {}; + private Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer = (ignored) -> {}; + + public TestingResourceActionsBuilder setReleaseResourceConsumer(BiConsumer<InstanceID, Exception> releaseResourceConsumer) { + this.releaseResourceConsumer = releaseResourceConsumer; + return this; + } + + public TestingResourceActionsBuilder setAllocateResourceConsumer(Consumer<ResourceProfile> allocateResourceConsumer) { + this.allocateResourceConsumer = allocateResourceConsumer; + return this; + } + + public TestingResourceActionsBuilder setNotifyAllocationFailureConsumer(Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer) { + this.notifyAllocationFailureConsumer = notifyAllocationFailureConsumer; + return this; + } + + public TestingResourceActions createTestingResourceActions() { + return new TestingResourceActions(releaseResourceConsumer, allocateResourceConsumer, notifyAllocationFailureConsumer); + } +}