This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f2f83e1956eccecaa2371b21bddaf7778bb4f819 Author: Lijie Wang <[email protected]> AuthorDate: Mon Jul 4 14:40:51 2022 +0800 [FLINK-28144][runtime] Introduce BlocklistDeclarativeSlotPool. BlocklistDeclarativeSlotPool is an implementation of DeclarativeSlotPool that help to support blocklist mechanism, it will avoid allocating slots that located on blocked nodes. --- .../blocklist/BlockedTaskManagerChecker.java | 32 +++ .../DefaultSlotPoolServiceSchedulerFactory.java | 6 +- .../apache/flink/runtime/jobmaster/JobMaster.java | 4 +- .../jobmaster/SlotPoolServiceSchedulerFactory.java | 5 +- .../jobmaster/slotpool/AllocatedSlotPool.java | 8 + .../slotpool/BlocklistDeclarativeSlotPool.java | 124 +++++++++++ .../BlocklistDeclarativeSlotPoolFactory.java | 55 +++++ .../jobmaster/slotpool/DeclarativeSlotPool.java | 10 +- .../slotpool/DeclarativeSlotPoolBridge.java | 3 +- .../DeclarativeSlotPoolBridgeServiceFactory.java | 5 +- .../DeclarativeSlotPoolServiceFactory.java | 5 +- .../slotpool/DefaultAllocatedSlotPool.java | 5 + .../slotpool/DefaultDeclarativeSlotPool.java | 5 +- .../jobmaster/slotpool/SlotPoolServiceFactory.java | 3 +- .../flink/runtime/jobmaster/JobMasterTest.java | 4 +- .../slotpool/BlocklistDeclarativeSlotPoolTest.java | 246 +++++++++++++++++++++ .../slotpool/DeclarativeSlotPoolBridgeTest.java | 7 +- .../slotpool/DefaultDeclarativeSlotPoolTest.java | 12 +- .../jobmaster/slotpool/SlotPoolTestUtils.java | 10 + .../slotpool/TestingDeclarativeSlotPool.java | 19 +- .../TestingDeclarativeSlotPoolBuilder.java | 18 +- .../slotpool/TestingSlotPoolServiceBuilder.java | 3 +- 22 files changed, 551 insertions(+), 38 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedTaskManagerChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedTaskManagerChecker.java new file mode 100644 index 00000000000..c6e5f377bf5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedTaskManagerChecker.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; + +/** This checker helps to query whether a given task manager is blocked. */ +public interface BlockedTaskManagerChecker { + + /** + * Returns whether the given task manager is located on a blocked node. + * + * @param resourceID ID of the task manager to query + * @return True if the given task manager is located on a blocked node, otherwise false. + */ + boolean isBlockedTaskManager(ResourceID resourceID); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index 58795fc5216..0b3757a761e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeServiceFactory; +import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory; import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolServiceFactory; import org.apache.flink.runtime.jobmaster.slotpool.PreferredAllocationRequestSlotMatchingStrategy; import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy; @@ -81,8 +82,9 @@ public final class DefaultSlotPoolServiceSchedulerFactory } @Override - public SlotPoolService createSlotPoolService(JobID jid) { - return slotPoolServiceFactory.createSlotPoolService(jid); + public SlotPoolService createSlotPoolService( + JobID jid, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { + return slotPoolServiceFactory.createSlotPoolService(jid, declarativeSlotPoolFactory); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 476802ca7f7..3d11f835e36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; +import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolFactory; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -302,7 +303,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> highAvailabilityServices.getResourceManagerLeaderRetriever(); this.slotPoolService = - checkNotNull(slotPoolServiceSchedulerFactory).createSlotPoolService(jid); + checkNotNull(slotPoolServiceSchedulerFactory) + .createSlotPoolService(jid, new DefaultDeclarativeSlotPoolFactory()); this.registeredTaskManagers = new HashMap<>(4); this.partitionTracker = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java index b655602b168..d97dc577b41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -46,9 +47,11 @@ public interface SlotPoolServiceSchedulerFactory { * Creates a {@link SlotPoolService}. * * @param jid jid is the JobID to pass to the service + * @param declarativeSlotPoolFactory the declarative slot pool factory * @return created SlotPoolService */ - SlotPoolService createSlotPoolService(JobID jid); + SlotPoolService createSlotPoolService( + JobID jid, DeclarativeSlotPoolFactory declarativeSlotPoolFactory); /** * Returns the scheduler type this factory is creating. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java index 6f426468df5..092963c00d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java @@ -102,6 +102,14 @@ public interface AllocatedSlotPool { */ Optional<AllocatedSlot> freeReservedSlot(AllocationID allocationId, long currentTime); + /** + * Returns slot information specified by the given allocationId. + * + * @return the slot information if there was a slot with the given allocationId; otherwise + * {@link Optional#empty()} + */ + Optional<SlotInfo> getSlotInformation(AllocationID allocationID); + /** * Returns information about all currently free slots. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java new file mode 100644 index 00000000000..a01fc70a6da --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.util.FlinkRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link DeclarativeSlotPool} implementation that supports blocklist. This implementation will + * avoid allocating slots that located on blocked nodes. The core idea is to keep the slot pool in + * such a state: there is no slot in slot pool that is free (no task assigned) and located on + * blocked nodes. + */ +public class BlocklistDeclarativeSlotPool extends DefaultDeclarativeSlotPool { + + private static final Logger LOG = LoggerFactory.getLogger(BlocklistDeclarativeSlotPool.class); + + private final BlockedTaskManagerChecker blockedTaskManagerChecker; + + BlocklistDeclarativeSlotPool( + JobID jobId, + AllocatedSlotPool slotPool, + Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, + BlockedTaskManagerChecker blockedTaskManagerChecker, + Time idleSlotTimeout, + Time rpcTimeout) { + super(jobId, slotPool, notifyNewResourceRequirements, idleSlotTimeout, rpcTimeout); + this.blockedTaskManagerChecker = checkNotNull(blockedTaskManagerChecker); + } + + @Override + public Collection<SlotOffer> offerSlots( + Collection<? extends SlotOffer> offers, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + long currentTime) { + if (!isBlockedTaskManager(taskManagerLocation.getResourceID())) { + return super.offerSlots(offers, taskManagerLocation, taskManagerGateway, currentTime); + } else { + LOG.debug( + "Reject slots {} from a blocked TaskManager {}.", offers, taskManagerLocation); + return Collections.emptySet(); + } + } + + @Override + public Collection<SlotOffer> registerSlots( + Collection<? extends SlotOffer> slots, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + long currentTime) { + if (!isBlockedTaskManager(taskManagerLocation.getResourceID())) { + return super.registerSlots(slots, taskManagerLocation, taskManagerGateway, currentTime); + } else { + LOG.debug("Reject slots {} from a blocked TaskManager {}.", slots, taskManagerLocation); + return Collections.emptySet(); + } + } + + @Override + public ResourceCounter freeReservedSlot( + AllocationID allocationId, @Nullable Throwable cause, long currentTime) { + Optional<SlotInfo> slotInfo = slotPool.getSlotInformation(allocationId); + + if (!slotInfo.isPresent()) { + return ResourceCounter.empty(); + } + + ResourceID taskManagerId = slotInfo.get().getTaskManagerLocation().getResourceID(); + if (!isBlockedTaskManager(taskManagerId)) { + return super.freeReservedSlot(allocationId, cause, currentTime); + } else { + LOG.debug("Free reserved slot {}.", allocationId); + return releaseSlot( + allocationId, + new FlinkRuntimeException( + String.format( + "Free reserved slot %s on blocked task manager %s.", + allocationId, taskManagerId.getStringWithMetadata()))); + } + } + + private boolean isBlockedTaskManager(ResourceID resourceID) { + return blockedTaskManagerChecker.isBlockedTaskManager(resourceID); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java new file mode 100644 index 00000000000..b3a4db27d83 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; +import org.apache.flink.runtime.slots.ResourceRequirement; + +import java.util.Collection; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Factory for {@link BlocklistDeclarativeSlotPool}. */ +public class BlocklistDeclarativeSlotPoolFactory implements DeclarativeSlotPoolFactory { + + private final BlockedTaskManagerChecker blockedTaskManagerChecker; + + public BlocklistDeclarativeSlotPoolFactory( + BlockedTaskManagerChecker blockedTaskManagerChecker) { + this.blockedTaskManagerChecker = checkNotNull(blockedTaskManagerChecker); + } + + @Override + public DeclarativeSlotPool create( + JobID jobId, + Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, + Time idleSlotTimeout, + Time rpcTimeout) { + return new BlocklistDeclarativeSlotPool( + jobId, + new DefaultAllocatedSlotPool(), + notifyNewResourceRequirements, + blockedTaskManagerChecker, + idleSlotTimeout, + rpcTimeout); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java index d2a9bee6b01..9cd89064275 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java @@ -87,15 +87,21 @@ public interface DeclarativeSlotPool { long currentTime); /** - * Registers the given set of slots at the slot pool. + * Registers the given set of slots at the slot pool. The slot pool will try to accept all slots + * unless the slot is unavailable (for example, the TaskManger is blocked). + * + * <p>The difference from {@link #offerSlots} is that this method allows accepting slots which + * exceed the currently required, but the {@link #offerSlots} only accepts those slots that are + * currently required. * * @param slots slots to register * @param taskManagerLocation taskManagerLocation is the location of the offering TaskExecutor * @param taskManagerGateway taskManagerGateway is the gateway to talk to the offering * TaskExecutor * @param currentTime currentTime is the time the slots are being offered + * @return the successfully registered slots; the other slot offers are implicitly rejected */ - void registerSlots( + Collection<SlotOffer> registerSlots( Collection<? extends SlotOffer> slots, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index 3bc2c6c1420..6391657b18a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -153,14 +153,13 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem } if (isJobRestarting) { - getDeclarativeSlotPool() + return getDeclarativeSlotPool() .registerSlots( offers, taskManagerLocation, taskManagerGateway, getRelativeTimeMillis()); - return offers; } else { return getDeclarativeSlotPool() .offerSlots( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java index a298a647080..339f494272e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java @@ -41,10 +41,11 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends AbstractSlotPoolSer @Nonnull @Override - public SlotPoolService createSlotPoolService(@Nonnull JobID jobId) { + public SlotPoolService createSlotPoolService( + @Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { return new DeclarativeSlotPoolBridge( jobId, - new DefaultDeclarativeSlotPoolFactory(), + declarativeSlotPoolFactory, clock, rpcTimeout, slotIdleTimeout, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java index c978b69f85e..af8e3d4d801 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java @@ -39,8 +39,9 @@ public class DeclarativeSlotPoolServiceFactory implements SlotPoolServiceFactory @Nonnull @Override - public SlotPoolService createSlotPoolService(@Nonnull JobID jobId) { + public SlotPoolService createSlotPoolService( + @Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { return new DeclarativeSlotPoolService( - jobId, new DefaultDeclarativeSlotPoolFactory(), clock, idleSlotTimeout, rpcTimeout); + jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, rpcTimeout); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java index 8006b296895..f5d426be2f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java @@ -178,6 +178,11 @@ public class DefaultAllocatedSlotPool implements AllocatedSlotPool { } } + @Override + public Optional<SlotInfo> getSlotInformation(AllocationID allocationID) { + return Optional.ofNullable(registeredSlots.get(allocationID)); + } + @Override public Collection<FreeSlotInfo> getFreeSlotsInformation() { final Map<ResourceID, Integer> freeSlotsPerTaskExecutor = new HashMap<>(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java index fece838304c..e1ae3c2a176 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java @@ -92,7 +92,7 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { private final Time rpcTimeout; private final JobID jobId; - private final AllocatedSlotPool slotPool; + protected final AllocatedSlotPool slotPool; private final Map<AllocationID, ResourceProfile> slotToRequirementProfileMappings; @@ -233,7 +233,7 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { } @Override - public void registerSlots( + public Collection<SlotOffer> registerSlots( Collection<? extends SlotOffer> slots, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, @@ -253,6 +253,7 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { taskManagerGateway, currentTime, this::matchWithOutstandingRequirementOrWildcard); + return new ArrayList<>(slots); } private Optional<ResourceProfile> matchWithOutstandingRequirementOrWildcard( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java index 7fbf8c0c24c..bfbac79569c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java @@ -26,5 +26,6 @@ import javax.annotation.Nonnull; public interface SlotPoolServiceFactory { @Nonnull - SlotPoolService createSlotPoolService(@Nonnull JobID jobId); + SlotPoolService createSlotPoolService( + @Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 2a1b4eaa7a7..d7a3afbb688 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -74,6 +74,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; @@ -458,7 +459,8 @@ class JobMasterTest { @Nonnull @Override - public SlotPoolService createSlotPoolService(@Nonnull JobID jobId) { + public SlotPoolService createSlotPoolService( + @Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { return new TestingSlotPool(jobId, hasReceivedSlotOffers); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java new file mode 100644 index 00000000000..56322e520b0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.ResourceCounter; + +import org.junit.jupiter.api.Test; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.FreeSlotConsumer; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.NewSlotsService; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createResourceRequirements; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.drainNewSlotService; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool; +import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.getOnlyElement; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link BlocklistDeclarativeSlotPool}. */ +class BlocklistDeclarativeSlotPoolTest { + + private static final ResourceProfile RESOURCE_PROFILE = + ResourceProfile.newBuilder().setCpuCores(1.7).build(); + + @Test + void testOfferSlotsFromBlockedTaskManager() throws Exception { + testOfferSlots(true); + } + + @Test + void testOfferSlotsFromUnblockedTaskManager() throws Exception { + testOfferSlots(false); + } + + private void testOfferSlots(boolean isBlocked) throws Exception { + final TaskManagerLocation taskManager = new LocalTaskManagerLocation(); + + final NewSlotsService notifyNewSlots = new NewSlotsService(); + // mark task manager as blocked. + final BlocklistDeclarativeSlotPool slotPool = + BlocklistDeclarativeSlotPoolBuilder.builder() + .setBlockedTaskManagerChecker( + isBlocked ? taskManager.getResourceID()::equals : ignore -> false) + .build(); + slotPool.registerNewSlotsListener(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + // offer slots on the blocked task manager + Collection<SlotOffer> slotOffers = + createSlotOffersForResourceRequirements(resourceRequirements); + + if (isBlocked) { + assertThat(SlotPoolTestUtils.offerSlots(slotPool, slotOffers, taskManager)).isEmpty(); + assertThat(drainNewSlotService(notifyNewSlots)).isEmpty(); + } else { + assertThat(SlotPoolTestUtils.offerSlots(slotPool, slotOffers, taskManager)) + .containsExactlyInAnyOrderElementsOf(slotOffers); + Map<AllocationID, SlotOffer> slotOfferMap = + slotOffers.stream() + .collect( + Collectors.toMap( + SlotOffer::getAllocationId, Function.identity())); + assertThat(drainNewSlotService(notifyNewSlots)) + .allMatch( + slot -> + matchSlotToOffers( + slot, slotOfferMap.remove(slot.getAllocationId()))); + } + } + + @Test + void testRegisterSlotsFromBlockedTaskManager() { + testRegisterSlots(true); + } + + @Test + void testRegisterSlotsFromUnblockedTaskManager() { + testRegisterSlots(false); + } + + private void testRegisterSlots(boolean isBlocked) { + TaskManagerLocation taskManager = new LocalTaskManagerLocation(); + final BlocklistDeclarativeSlotPool slotPool = + BlocklistDeclarativeSlotPoolBuilder.builder() + .setBlockedTaskManagerChecker( + isBlocked ? taskManager.getResourceID()::equals : ignore -> false) + .build(); + + final int numberSlots = 10; + final Collection<SlotOffer> slotOffers = + createSlotOffersForResourceRequirements( + ResourceCounter.withResource(RESOURCE_PROFILE, numberSlots)); + + Collection<SlotOffer> acceptedOffers = + slotPool.registerSlots( + slotOffers, + taskManager, + SlotPoolTestUtils.createTaskManagerGateway(null), + 0); + + final Collection<? extends SlotInfo> allSlotsInformation = + slotPool.getAllSlotsInformation(); + + if (isBlocked) { + assertThat(acceptedOffers).isEmpty(); + assertThat(allSlotsInformation).isEmpty(); + } else { + assertThat(acceptedOffers).containsExactlyInAnyOrderElementsOf(slotOffers); + assertThat( + allSlotsInformation.stream() + .map(SlotInfo::getAllocationId) + .collect(Collectors.toSet())) + .isEqualTo( + slotOffers.stream() + .map(SlotOffer::getAllocationId) + .collect(Collectors.toSet())); + } + } + + @Test + void testFreeReservedSlotsOnBlockedTaskManager() throws Exception { + testFreeReservedSlots(true); + } + + @Test + void testFreeReservedSlotsOnUnblockedTaskManager() throws Exception { + testFreeReservedSlots(false); + } + + private void testFreeReservedSlots(boolean isBlocked) throws Exception { + final FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction(freeSlotConsumer) + .createTestingTaskExecutorGateway(); + + final NewSlotsService notifyNewSlots = new NewSlotsService(); + Set<ResourceID> blockedTaskManagers = new HashSet<>(); + final BlocklistDeclarativeSlotPool slotPool = + BlocklistDeclarativeSlotPoolBuilder.builder() + .setBlockedTaskManagerChecker(blockedTaskManagers::contains) + .build(); + slotPool.registerNewSlotsListener(notifyNewSlots); + + increaseRequirementsAndOfferSlotsToSlotPool( + slotPool, + ResourceCounter.withResource(RESOURCE_PROFILE, 1), + null, + testingTaskExecutorGateway); + + final Collection<PhysicalSlot> newSlots = drainNewSlotService(notifyNewSlots); + final PhysicalSlot offeredSlot = getOnlyElement(newSlots); + final AllocationID allocationID = offeredSlot.getAllocationId(); + + slotPool.reserveFreeSlot(allocationID, RESOURCE_PROFILE); + + if (isBlocked) { + // block TM + blockedTaskManagers.add(offeredSlot.getTaskManagerLocation().getResourceID()); + } + + ResourceCounter previouslyFulfilledRequirement = + slotPool.freeReservedSlot(allocationID, null, 0); + + final Collection<PhysicalSlot> recycledSlots = drainNewSlotService(notifyNewSlots); + + assertThat(previouslyFulfilledRequirement) + .isEqualTo(ResourceCounter.withResource(RESOURCE_PROFILE, 1)); + + if (isBlocked) { + assertThat(recycledSlots).isEmpty(); + assertThat(getOnlyElement(freeSlotConsumer.drainFreedSlots())).isEqualTo(allocationID); + assertThat(slotPool.getAllSlotsInformation()).isEmpty(); + } else { + assertThat(getOnlyElement(recycledSlots).getAllocationId()).isEqualTo(allocationID); + assertThat(freeSlotConsumer.drainFreedSlots()).isEmpty(); + assertThat(getOnlyElement(slotPool.getAllSlotsInformation()).getAllocationId()) + .isEqualTo(allocationID); + } + } + + private boolean matchSlotToOffers(PhysicalSlot physicalSlot, SlotOffer slotOffer) { + return physicalSlot.getAllocationId().equals(slotOffer.getAllocationId()) + && physicalSlot.getResourceProfile().equals(slotOffer.getResourceProfile()) + && physicalSlot.getPhysicalSlotNumber() == slotOffer.getSlotIndex(); + } + + private static class BlocklistDeclarativeSlotPoolBuilder { + private BlockedTaskManagerChecker blockedTaskManagerChecker = resourceID -> false; + + public BlocklistDeclarativeSlotPoolBuilder setBlockedTaskManagerChecker( + BlockedTaskManagerChecker blockedTaskManagerChecker) { + this.blockedTaskManagerChecker = blockedTaskManagerChecker; + return this; + } + + public BlocklistDeclarativeSlotPool build() { + return new BlocklistDeclarativeSlotPool( + new JobID(), + new DefaultAllocatedSlotPool(), + ignored -> {}, + blockedTaskManagerChecker, + Time.seconds(20), + Time.seconds(20)); + } + + public static BlocklistDeclarativeSlotPoolBuilder builder() { + return new BlocklistDeclarativeSlotPoolBuilder(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java index 7fd875f580a..3cb2fe7fbcd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java @@ -46,6 +46,7 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -252,8 +253,10 @@ public class DeclarativeSlotPoolBridgeTest extends TestLogger { (slotOffers, taskManagerLocation, taskManagerGateway, - aLong) -> - registerSlotsCalledFuture.complete(null))); + aLong) -> { + registerSlotsCalledFuture.complete(null); + return new ArrayList<>(slotOffers); + })); try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = createDeclarativeSlotPoolBridge( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java index 3188f3c8991..8127c7eb2c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java @@ -681,7 +681,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger { } @Nonnull - private static ResourceCounter createResourceRequirements() { + static ResourceCounter createResourceRequirements() { final Map<ResourceProfile, Integer> requirements = new HashMap<>(); requirements.put(RESOURCE_PROFILE_1, 2); requirements.put(RESOURCE_PROFILE_2, 1); @@ -754,7 +754,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger { } @Nonnull - private static Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool( + static Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool( DefaultDeclarativeSlotPool slotPool, ResourceCounter resourceRequirements, @Nullable LocalTaskManagerLocation taskManagerLocation, @@ -772,7 +772,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger { } @Nonnull - private static Collection<PhysicalSlot> drainNewSlotService(NewSlotsService notifyNewSlots) + static Collection<PhysicalSlot> drainNewSlotService(NewSlotsService notifyNewSlots) throws InterruptedException { final Collection<PhysicalSlot> newSlots = new ArrayList<>(); @@ -807,7 +807,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger { } } - private static final class NewSlotsService implements DeclarativeSlotPool.NewSlotsListener { + static final class NewSlotsService implements DeclarativeSlotPool.NewSlotsListener { private final BlockingQueue<Collection<? extends PhysicalSlot>> physicalSlotsQueue = new ArrayBlockingQueue<>(2); @@ -854,7 +854,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger { } } - private static class FreeSlotConsumer + static class FreeSlotConsumer implements BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> { final BlockingQueue<AllocationID> freedSlots = new ArrayBlockingQueue<>(10); @@ -866,7 +866,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger { return CompletableFuture.completedFuture(Acknowledge.get()); } - private Collection<AllocationID> drainFreedSlots() { + Collection<AllocationID> drainFreedSlots() { final Collection<AllocationID> result = new ArrayList<>(); freedSlots.drainTo(result); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java index 95a6f225634..3754ac427e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -53,6 +54,15 @@ public final class SlotPoolTestUtils { return offerSlots(slotPool, slotOffers, createTaskManagerGateway(null)); } + @Nonnull + public static Collection<SlotOffer> offerSlots( + DeclarativeSlotPool slotPool, + Collection<? extends SlotOffer> slotOffers, + TaskManagerLocation taskManagerLocation) { + return slotPool.offerSlots( + slotOffers, taskManagerLocation, createTaskManagerGateway(null), 0); + } + @Nonnull public static Collection<SlotOffer> offerSlots( DeclarativeSlotPool slotPool, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java index 2d8f4018181..b3c49bc8cb7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.ResourceCounter; -import org.apache.flink.util.function.QuadConsumer; import org.apache.flink.util.function.QuadFunction; import org.apache.flink.util.function.TriFunction; @@ -57,8 +56,12 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { Collection<SlotOffer>> offerSlotsFunction; - private final QuadConsumer< - Collection<? extends SlotOffer>, TaskManagerLocation, TaskManagerGateway, Long> + private final QuadFunction< + Collection<? extends SlotOffer>, + TaskManagerLocation, + TaskManagerGateway, + Long, + Collection<SlotOffer>> registerSlotsFunction; private final Supplier<Collection<SlotInfoWithUtilization>> getFreeSlotsInformationSupplier; @@ -93,11 +96,12 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { Long, Collection<SlotOffer>> offerSlotsFunction, - QuadConsumer< + QuadFunction< Collection<? extends SlotOffer>, TaskManagerLocation, TaskManagerGateway, - Long> + Long, + Collection<SlotOffer>> registerSlotsFunction, Supplier<Collection<SlotInfoWithUtilization>> getFreeSlotsInformationSupplier, Supplier<Collection<? extends SlotInfo>> getAllSlotsInformationSupplier, @@ -157,12 +161,13 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { } @Override - public void registerSlots( + public Collection<SlotOffer> registerSlots( Collection<? extends SlotOffer> slots, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime) { - registerSlotsFunction.accept(slots, taskManagerLocation, taskManagerGateway, currentTime); + return registerSlotsFunction.apply( + slots, taskManagerLocation, taskManagerGateway, currentTime); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java index ee705fec923..99a02324b29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java @@ -27,10 +27,10 @@ import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.ResourceCounter; -import org.apache.flink.util.function.QuadConsumer; import org.apache.flink.util.function.QuadFunction; import org.apache.flink.util.function.TriFunction; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.function.BiFunction; @@ -70,9 +70,14 @@ public class TestingDeclarativeSlotPoolBuilder { private LongConsumer returnIdleSlotsConsumer = ignored -> {}; private Consumer<ResourceCounter> setResourceRequirementsConsumer = ignored -> {}; private Function<AllocationID, Boolean> containsFreeSlotFunction = ignored -> false; - private QuadConsumer< - Collection<? extends SlotOffer>, TaskManagerLocation, TaskManagerGateway, Long> - registerSlotsFunction = (ignoredA, ignoredB, ignoredC, ignoredD) -> {}; + private QuadFunction< + Collection<? extends SlotOffer>, + TaskManagerLocation, + TaskManagerGateway, + Long, + Collection<SlotOffer>> + registerSlotsFunction = + (slotOffers, ignoredB, ignoredC, ignoredD) -> new ArrayList<>(slotOffers); public TestingDeclarativeSlotPoolBuilder setIncreaseResourceRequirementsByConsumer( Consumer<ResourceCounter> increaseResourceRequirementsByConsumer) { @@ -111,11 +116,12 @@ public class TestingDeclarativeSlotPoolBuilder { } public TestingDeclarativeSlotPoolBuilder setRegisterSlotsFunction( - QuadConsumer< + QuadFunction< Collection<? extends SlotOffer>, TaskManagerLocation, TaskManagerGateway, - Long> + Long, + Collection<SlotOffer>> registerSlotsFunction) { this.registerSlotsFunction = registerSlotsFunction; return this; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java index 598905caf57..aed1790d597 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java @@ -70,7 +70,8 @@ public class TestingSlotPoolServiceBuilder implements SlotPoolServiceFactory { @Nonnull @Override - public SlotPoolService createSlotPoolService(@Nonnull JobID jobId) { + public SlotPoolService createSlotPoolService( + @Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { return new TestingSlotPoolService( jobId, startConsumer,
