This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2f83e1956eccecaa2371b21bddaf7778bb4f819
Author: Lijie Wang <[email protected]>
AuthorDate: Mon Jul 4 14:40:51 2022 +0800

    [FLINK-28144][runtime] Introduce BlocklistDeclarativeSlotPool.
    
    BlocklistDeclarativeSlotPool is an implementation of DeclarativeSlotPool 
that help to support blocklist mechanism, it will avoid allocating slots that 
located on blocked nodes.
---
 .../blocklist/BlockedTaskManagerChecker.java       |  32 +++
 .../DefaultSlotPoolServiceSchedulerFactory.java    |   6 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   4 +-
 .../jobmaster/SlotPoolServiceSchedulerFactory.java |   5 +-
 .../jobmaster/slotpool/AllocatedSlotPool.java      |   8 +
 .../slotpool/BlocklistDeclarativeSlotPool.java     | 124 +++++++++++
 .../BlocklistDeclarativeSlotPoolFactory.java       |  55 +++++
 .../jobmaster/slotpool/DeclarativeSlotPool.java    |  10 +-
 .../slotpool/DeclarativeSlotPoolBridge.java        |   3 +-
 .../DeclarativeSlotPoolBridgeServiceFactory.java   |   5 +-
 .../DeclarativeSlotPoolServiceFactory.java         |   5 +-
 .../slotpool/DefaultAllocatedSlotPool.java         |   5 +
 .../slotpool/DefaultDeclarativeSlotPool.java       |   5 +-
 .../jobmaster/slotpool/SlotPoolServiceFactory.java |   3 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   4 +-
 .../slotpool/BlocklistDeclarativeSlotPoolTest.java | 246 +++++++++++++++++++++
 .../slotpool/DeclarativeSlotPoolBridgeTest.java    |   7 +-
 .../slotpool/DefaultDeclarativeSlotPoolTest.java   |  12 +-
 .../jobmaster/slotpool/SlotPoolTestUtils.java      |  10 +
 .../slotpool/TestingDeclarativeSlotPool.java       |  19 +-
 .../TestingDeclarativeSlotPoolBuilder.java         |  18 +-
 .../slotpool/TestingSlotPoolServiceBuilder.java    |   3 +-
 22 files changed, 551 insertions(+), 38 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedTaskManagerChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedTaskManagerChecker.java
new file mode 100644
index 00000000000..c6e5f377bf5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedTaskManagerChecker.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+/** This checker helps to query whether a given task manager is blocked. */
+public interface BlockedTaskManagerChecker {
+
+    /**
+     * Returns whether the given task manager is located on a blocked node.
+     *
+     * @param resourceID ID of the task manager to query
+     * @return True if the given task manager is located on a blocked node, 
otherwise false.
+     */
+    boolean isBlockedTaskManager(ResourceID resourceID);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 58795fc5216..0b3757a761e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobType;
 import 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeServiceFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
 import 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolServiceFactory;
 import 
org.apache.flink.runtime.jobmaster.slotpool.PreferredAllocationRequestSlotMatchingStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
@@ -81,8 +82,9 @@ public final class DefaultSlotPoolServiceSchedulerFactory
     }
 
     @Override
-    public SlotPoolService createSlotPoolService(JobID jid) {
-        return slotPoolServiceFactory.createSlotPoolService(jid);
+    public SlotPoolService createSlotPoolService(
+            JobID jid, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) {
+        return slotPoolServiceFactory.createSlotPoolService(jid, 
declarativeSlotPoolFactory);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 476802ca7f7..3d11f835e36 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -302,7 +303,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
                 highAvailabilityServices.getResourceManagerLeaderRetriever();
 
         this.slotPoolService =
-                
checkNotNull(slotPoolServiceSchedulerFactory).createSlotPoolService(jid);
+                checkNotNull(slotPoolServiceSchedulerFactory)
+                        .createSlotPoolService(jid, new 
DefaultDeclarativeSlotPoolFactory());
 
         this.registeredTaskManagers = new HashMap<>(4);
         this.partitionTracker =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
index b655602b168..d97dc577b41 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -46,9 +47,11 @@ public interface SlotPoolServiceSchedulerFactory {
      * Creates a {@link SlotPoolService}.
      *
      * @param jid jid is the JobID to pass to the service
+     * @param declarativeSlotPoolFactory the declarative slot pool factory
      * @return created SlotPoolService
      */
-    SlotPoolService createSlotPoolService(JobID jid);
+    SlotPoolService createSlotPoolService(
+            JobID jid, DeclarativeSlotPoolFactory declarativeSlotPoolFactory);
 
     /**
      * Returns the scheduler type this factory is creating.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
index 6f426468df5..092963c00d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
@@ -102,6 +102,14 @@ public interface AllocatedSlotPool {
      */
     Optional<AllocatedSlot> freeReservedSlot(AllocationID allocationId, long 
currentTime);
 
+    /**
+     * Returns slot information specified by the given allocationId.
+     *
+     * @return the slot information if there was a slot with the given 
allocationId; otherwise
+     *     {@link Optional#empty()}
+     */
+    Optional<SlotInfo> getSlotInformation(AllocationID allocationID);
+
     /**
      * Returns information about all currently free slots.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
new file mode 100644
index 00000000000..a01fc70a6da
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DeclarativeSlotPool} implementation that supports blocklist. This 
implementation will
+ * avoid allocating slots that located on blocked nodes. The core idea is to 
keep the slot pool in
+ * such a state: there is no slot in slot pool that is free (no task assigned) 
and located on
+ * blocked nodes.
+ */
+public class BlocklistDeclarativeSlotPool extends DefaultDeclarativeSlotPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlocklistDeclarativeSlotPool.class);
+
+    private final BlockedTaskManagerChecker blockedTaskManagerChecker;
+
+    BlocklistDeclarativeSlotPool(
+            JobID jobId,
+            AllocatedSlotPool slotPool,
+            Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+            BlockedTaskManagerChecker blockedTaskManagerChecker,
+            Time idleSlotTimeout,
+            Time rpcTimeout) {
+        super(jobId, slotPool, notifyNewResourceRequirements, idleSlotTimeout, 
rpcTimeout);
+        this.blockedTaskManagerChecker = 
checkNotNull(blockedTaskManagerChecker);
+    }
+
+    @Override
+    public Collection<SlotOffer> offerSlots(
+            Collection<? extends SlotOffer> offers,
+            TaskManagerLocation taskManagerLocation,
+            TaskManagerGateway taskManagerGateway,
+            long currentTime) {
+        if (!isBlockedTaskManager(taskManagerLocation.getResourceID())) {
+            return super.offerSlots(offers, taskManagerLocation, 
taskManagerGateway, currentTime);
+        } else {
+            LOG.debug(
+                    "Reject slots {} from a blocked TaskManager {}.", offers, 
taskManagerLocation);
+            return Collections.emptySet();
+        }
+    }
+
+    @Override
+    public Collection<SlotOffer> registerSlots(
+            Collection<? extends SlotOffer> slots,
+            TaskManagerLocation taskManagerLocation,
+            TaskManagerGateway taskManagerGateway,
+            long currentTime) {
+        if (!isBlockedTaskManager(taskManagerLocation.getResourceID())) {
+            return super.registerSlots(slots, taskManagerLocation, 
taskManagerGateway, currentTime);
+        } else {
+            LOG.debug("Reject slots {} from a blocked TaskManager {}.", slots, 
taskManagerLocation);
+            return Collections.emptySet();
+        }
+    }
+
+    @Override
+    public ResourceCounter freeReservedSlot(
+            AllocationID allocationId, @Nullable Throwable cause, long 
currentTime) {
+        Optional<SlotInfo> slotInfo = 
slotPool.getSlotInformation(allocationId);
+
+        if (!slotInfo.isPresent()) {
+            return ResourceCounter.empty();
+        }
+
+        ResourceID taskManagerId = 
slotInfo.get().getTaskManagerLocation().getResourceID();
+        if (!isBlockedTaskManager(taskManagerId)) {
+            return super.freeReservedSlot(allocationId, cause, currentTime);
+        } else {
+            LOG.debug("Free reserved slot {}.", allocationId);
+            return releaseSlot(
+                    allocationId,
+                    new FlinkRuntimeException(
+                            String.format(
+                                    "Free reserved slot %s on blocked task 
manager %s.",
+                                    allocationId, 
taskManagerId.getStringWithMetadata())));
+        }
+    }
+
+    private boolean isBlockedTaskManager(ResourceID resourceID) {
+        return blockedTaskManagerChecker.isBlockedTaskManager(resourceID);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
new file mode 100644
index 00000000000..b3a4db27d83
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Factory for {@link BlocklistDeclarativeSlotPool}. */
+public class BlocklistDeclarativeSlotPoolFactory implements 
DeclarativeSlotPoolFactory {
+
+    private final BlockedTaskManagerChecker blockedTaskManagerChecker;
+
+    public BlocklistDeclarativeSlotPoolFactory(
+            BlockedTaskManagerChecker blockedTaskManagerChecker) {
+        this.blockedTaskManagerChecker = 
checkNotNull(blockedTaskManagerChecker);
+    }
+
+    @Override
+    public DeclarativeSlotPool create(
+            JobID jobId,
+            Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+            Time idleSlotTimeout,
+            Time rpcTimeout) {
+        return new BlocklistDeclarativeSlotPool(
+                jobId,
+                new DefaultAllocatedSlotPool(),
+                notifyNewResourceRequirements,
+                blockedTaskManagerChecker,
+                idleSlotTimeout,
+                rpcTimeout);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
index d2a9bee6b01..9cd89064275 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
@@ -87,15 +87,21 @@ public interface DeclarativeSlotPool {
             long currentTime);
 
     /**
-     * Registers the given set of slots at the slot pool.
+     * Registers the given set of slots at the slot pool. The slot pool will 
try to accept all slots
+     * unless the slot is unavailable (for example, the TaskManger is blocked).
+     *
+     * <p>The difference from {@link #offerSlots} is that this method allows 
accepting slots which
+     * exceed the currently required, but the {@link #offerSlots} only accepts 
those slots that are
+     * currently required.
      *
      * @param slots slots to register
      * @param taskManagerLocation taskManagerLocation is the location of the 
offering TaskExecutor
      * @param taskManagerGateway taskManagerGateway is the gateway to talk to 
the offering
      *     TaskExecutor
      * @param currentTime currentTime is the time the slots are being offered
+     * @return the successfully registered slots; the other slot offers are 
implicitly rejected
      */
-    void registerSlots(
+    Collection<SlotOffer> registerSlots(
             Collection<? extends SlotOffer> slots,
             TaskManagerLocation taskManagerLocation,
             TaskManagerGateway taskManagerGateway,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 3bc2c6c1420..6391657b18a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -153,14 +153,13 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
         }
 
         if (isJobRestarting) {
-            getDeclarativeSlotPool()
+            return getDeclarativeSlotPool()
                     .registerSlots(
                             offers,
                             taskManagerLocation,
                             taskManagerGateway,
                             getRelativeTimeMillis());
 
-            return offers;
         } else {
             return getDeclarativeSlotPool()
                     .offerSlots(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
index a298a647080..339f494272e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
@@ -41,10 +41,11 @@ public class DeclarativeSlotPoolBridgeServiceFactory 
extends AbstractSlotPoolSer
 
     @Nonnull
     @Override
-    public SlotPoolService createSlotPoolService(@Nonnull JobID jobId) {
+    public SlotPoolService createSlotPoolService(
+            @Nonnull JobID jobId, DeclarativeSlotPoolFactory 
declarativeSlotPoolFactory) {
         return new DeclarativeSlotPoolBridge(
                 jobId,
-                new DefaultDeclarativeSlotPoolFactory(),
+                declarativeSlotPoolFactory,
                 clock,
                 rpcTimeout,
                 slotIdleTimeout,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
index c978b69f85e..af8e3d4d801 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
@@ -39,8 +39,9 @@ public class DeclarativeSlotPoolServiceFactory implements 
SlotPoolServiceFactory
 
     @Nonnull
     @Override
-    public SlotPoolService createSlotPoolService(@Nonnull JobID jobId) {
+    public SlotPoolService createSlotPoolService(
+            @Nonnull JobID jobId, DeclarativeSlotPoolFactory 
declarativeSlotPoolFactory) {
         return new DeclarativeSlotPoolService(
-                jobId, new DefaultDeclarativeSlotPoolFactory(), clock, 
idleSlotTimeout, rpcTimeout);
+                jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, 
rpcTimeout);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
index 8006b296895..f5d426be2f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
@@ -178,6 +178,11 @@ public class DefaultAllocatedSlotPool implements 
AllocatedSlotPool {
         }
     }
 
+    @Override
+    public Optional<SlotInfo> getSlotInformation(AllocationID allocationID) {
+        return Optional.ofNullable(registeredSlots.get(allocationID));
+    }
+
     @Override
     public Collection<FreeSlotInfo> getFreeSlotsInformation() {
         final Map<ResourceID, Integer> freeSlotsPerTaskExecutor = new 
HashMap<>();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
index fece838304c..e1ae3c2a176 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
@@ -92,7 +92,7 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
     private final Time rpcTimeout;
 
     private final JobID jobId;
-    private final AllocatedSlotPool slotPool;
+    protected final AllocatedSlotPool slotPool;
 
     private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
 
@@ -233,7 +233,7 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
     }
 
     @Override
-    public void registerSlots(
+    public Collection<SlotOffer> registerSlots(
             Collection<? extends SlotOffer> slots,
             TaskManagerLocation taskManagerLocation,
             TaskManagerGateway taskManagerGateway,
@@ -253,6 +253,7 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
                 taskManagerGateway,
                 currentTime,
                 this::matchWithOutstandingRequirementOrWildcard);
+        return new ArrayList<>(slots);
     }
 
     private Optional<ResourceProfile> 
matchWithOutstandingRequirementOrWildcard(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
index 7fbf8c0c24c..bfbac79569c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
@@ -26,5 +26,6 @@ import javax.annotation.Nonnull;
 public interface SlotPoolServiceFactory {
 
     @Nonnull
-    SlotPoolService createSlotPoolService(@Nonnull JobID jobId);
+    SlotPoolService createSlotPoolService(
+            @Nonnull JobID jobId, DeclarativeSlotPoolFactory 
declarativeSlotPoolFactory);
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 2a1b4eaa7a7..d7a3afbb688 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -74,6 +74,7 @@ import 
org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
@@ -458,7 +459,8 @@ class JobMasterTest {
 
         @Nonnull
         @Override
-        public SlotPoolService createSlotPoolService(@Nonnull JobID jobId) {
+        public SlotPoolService createSlotPoolService(
+                @Nonnull JobID jobId, DeclarativeSlotPoolFactory 
declarativeSlotPoolFactory) {
             return new TestingSlotPool(jobId, hasReceivedSlotOffers);
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
new file mode 100644
index 00000000000..56322e520b0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.ResourceCounter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.FreeSlotConsumer;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.NewSlotsService;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createResourceRequirements;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.drainNewSlotService;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool;
+import static 
org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.getOnlyElement;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BlocklistDeclarativeSlotPool}. */
+class BlocklistDeclarativeSlotPoolTest {
+
+    private static final ResourceProfile RESOURCE_PROFILE =
+            ResourceProfile.newBuilder().setCpuCores(1.7).build();
+
+    @Test
+    void testOfferSlotsFromBlockedTaskManager() throws Exception {
+        testOfferSlots(true);
+    }
+
+    @Test
+    void testOfferSlotsFromUnblockedTaskManager() throws Exception {
+        testOfferSlots(false);
+    }
+
+    private void testOfferSlots(boolean isBlocked) throws Exception {
+        final TaskManagerLocation taskManager = new LocalTaskManagerLocation();
+
+        final NewSlotsService notifyNewSlots = new NewSlotsService();
+        // mark task manager as blocked.
+        final BlocklistDeclarativeSlotPool slotPool =
+                BlocklistDeclarativeSlotPoolBuilder.builder()
+                        .setBlockedTaskManagerChecker(
+                                isBlocked ? 
taskManager.getResourceID()::equals : ignore -> false)
+                        .build();
+        slotPool.registerNewSlotsListener(notifyNewSlots);
+
+        final ResourceCounter resourceRequirements = 
createResourceRequirements();
+        slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+        // offer slots on the blocked task manager
+        Collection<SlotOffer> slotOffers =
+                createSlotOffersForResourceRequirements(resourceRequirements);
+
+        if (isBlocked) {
+            assertThat(SlotPoolTestUtils.offerSlots(slotPool, slotOffers, 
taskManager)).isEmpty();
+            assertThat(drainNewSlotService(notifyNewSlots)).isEmpty();
+        } else {
+            assertThat(SlotPoolTestUtils.offerSlots(slotPool, slotOffers, 
taskManager))
+                    .containsExactlyInAnyOrderElementsOf(slotOffers);
+            Map<AllocationID, SlotOffer> slotOfferMap =
+                    slotOffers.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            SlotOffer::getAllocationId, 
Function.identity()));
+            assertThat(drainNewSlotService(notifyNewSlots))
+                    .allMatch(
+                            slot ->
+                                    matchSlotToOffers(
+                                            slot, 
slotOfferMap.remove(slot.getAllocationId())));
+        }
+    }
+
+    @Test
+    void testRegisterSlotsFromBlockedTaskManager() {
+        testRegisterSlots(true);
+    }
+
+    @Test
+    void testRegisterSlotsFromUnblockedTaskManager() {
+        testRegisterSlots(false);
+    }
+
+    private void testRegisterSlots(boolean isBlocked) {
+        TaskManagerLocation taskManager = new LocalTaskManagerLocation();
+        final BlocklistDeclarativeSlotPool slotPool =
+                BlocklistDeclarativeSlotPoolBuilder.builder()
+                        .setBlockedTaskManagerChecker(
+                                isBlocked ? 
taskManager.getResourceID()::equals : ignore -> false)
+                        .build();
+
+        final int numberSlots = 10;
+        final Collection<SlotOffer> slotOffers =
+                createSlotOffersForResourceRequirements(
+                        ResourceCounter.withResource(RESOURCE_PROFILE, 
numberSlots));
+
+        Collection<SlotOffer> acceptedOffers =
+                slotPool.registerSlots(
+                        slotOffers,
+                        taskManager,
+                        SlotPoolTestUtils.createTaskManagerGateway(null),
+                        0);
+
+        final Collection<? extends SlotInfo> allSlotsInformation =
+                slotPool.getAllSlotsInformation();
+
+        if (isBlocked) {
+            assertThat(acceptedOffers).isEmpty();
+            assertThat(allSlotsInformation).isEmpty();
+        } else {
+            
assertThat(acceptedOffers).containsExactlyInAnyOrderElementsOf(slotOffers);
+            assertThat(
+                            allSlotsInformation.stream()
+                                    .map(SlotInfo::getAllocationId)
+                                    .collect(Collectors.toSet()))
+                    .isEqualTo(
+                            slotOffers.stream()
+                                    .map(SlotOffer::getAllocationId)
+                                    .collect(Collectors.toSet()));
+        }
+    }
+
+    @Test
+    void testFreeReservedSlotsOnBlockedTaskManager() throws Exception {
+        testFreeReservedSlots(true);
+    }
+
+    @Test
+    void testFreeReservedSlotsOnUnblockedTaskManager() throws Exception {
+        testFreeReservedSlots(false);
+    }
+
+    private void testFreeReservedSlots(boolean isBlocked) throws Exception {
+        final FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
+        final TestingTaskExecutorGateway testingTaskExecutorGateway =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setFreeSlotFunction(freeSlotConsumer)
+                        .createTestingTaskExecutorGateway();
+
+        final NewSlotsService notifyNewSlots = new NewSlotsService();
+        Set<ResourceID> blockedTaskManagers = new HashSet<>();
+        final BlocklistDeclarativeSlotPool slotPool =
+                BlocklistDeclarativeSlotPoolBuilder.builder()
+                        
.setBlockedTaskManagerChecker(blockedTaskManagers::contains)
+                        .build();
+        slotPool.registerNewSlotsListener(notifyNewSlots);
+
+        increaseRequirementsAndOfferSlotsToSlotPool(
+                slotPool,
+                ResourceCounter.withResource(RESOURCE_PROFILE, 1),
+                null,
+                testingTaskExecutorGateway);
+
+        final Collection<PhysicalSlot> newSlots = 
drainNewSlotService(notifyNewSlots);
+        final PhysicalSlot offeredSlot = getOnlyElement(newSlots);
+        final AllocationID allocationID = offeredSlot.getAllocationId();
+
+        slotPool.reserveFreeSlot(allocationID, RESOURCE_PROFILE);
+
+        if (isBlocked) {
+            // block TM
+            
blockedTaskManagers.add(offeredSlot.getTaskManagerLocation().getResourceID());
+        }
+
+        ResourceCounter previouslyFulfilledRequirement =
+                slotPool.freeReservedSlot(allocationID, null, 0);
+
+        final Collection<PhysicalSlot> recycledSlots = 
drainNewSlotService(notifyNewSlots);
+
+        assertThat(previouslyFulfilledRequirement)
+                .isEqualTo(ResourceCounter.withResource(RESOURCE_PROFILE, 1));
+
+        if (isBlocked) {
+            assertThat(recycledSlots).isEmpty();
+            
assertThat(getOnlyElement(freeSlotConsumer.drainFreedSlots())).isEqualTo(allocationID);
+            assertThat(slotPool.getAllSlotsInformation()).isEmpty();
+        } else {
+            
assertThat(getOnlyElement(recycledSlots).getAllocationId()).isEqualTo(allocationID);
+            assertThat(freeSlotConsumer.drainFreedSlots()).isEmpty();
+            
assertThat(getOnlyElement(slotPool.getAllSlotsInformation()).getAllocationId())
+                    .isEqualTo(allocationID);
+        }
+    }
+
+    private boolean matchSlotToOffers(PhysicalSlot physicalSlot, SlotOffer 
slotOffer) {
+        return 
physicalSlot.getAllocationId().equals(slotOffer.getAllocationId())
+                && 
physicalSlot.getResourceProfile().equals(slotOffer.getResourceProfile())
+                && physicalSlot.getPhysicalSlotNumber() == 
slotOffer.getSlotIndex();
+    }
+
+    private static class BlocklistDeclarativeSlotPoolBuilder {
+        private BlockedTaskManagerChecker blockedTaskManagerChecker = 
resourceID -> false;
+
+        public BlocklistDeclarativeSlotPoolBuilder 
setBlockedTaskManagerChecker(
+                BlockedTaskManagerChecker blockedTaskManagerChecker) {
+            this.blockedTaskManagerChecker = blockedTaskManagerChecker;
+            return this;
+        }
+
+        public BlocklistDeclarativeSlotPool build() {
+            return new BlocklistDeclarativeSlotPool(
+                    new JobID(),
+                    new DefaultAllocatedSlotPool(),
+                    ignored -> {},
+                    blockedTaskManagerChecker,
+                    Time.seconds(20),
+                    Time.seconds(20));
+        }
+
+        public static BlocklistDeclarativeSlotPoolBuilder builder() {
+            return new BlocklistDeclarativeSlotPoolBuilder();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
index 7fd875f580a..3cb2fe7fbcd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
@@ -46,6 +46,7 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -252,8 +253,10 @@ public class DeclarativeSlotPoolBridgeTest extends 
TestLogger {
                                         (slotOffers,
                                                 taskManagerLocation,
                                                 taskManagerGateway,
-                                                aLong) ->
-                                                
registerSlotsCalledFuture.complete(null)));
+                                                aLong) -> {
+                                            
registerSlotsCalledFuture.complete(null);
+                                            return new ArrayList<>(slotOffers);
+                                        }));
 
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
                 createDeclarativeSlotPoolBridge(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
index 3188f3c8991..8127c7eb2c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
@@ -681,7 +681,7 @@ public class DefaultDeclarativeSlotPoolTest extends 
TestLogger {
     }
 
     @Nonnull
-    private static ResourceCounter createResourceRequirements() {
+    static ResourceCounter createResourceRequirements() {
         final Map<ResourceProfile, Integer> requirements = new HashMap<>();
         requirements.put(RESOURCE_PROFILE_1, 2);
         requirements.put(RESOURCE_PROFILE_2, 1);
@@ -754,7 +754,7 @@ public class DefaultDeclarativeSlotPoolTest extends 
TestLogger {
     }
 
     @Nonnull
-    private static Collection<SlotOffer> 
increaseRequirementsAndOfferSlotsToSlotPool(
+    static Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool(
             DefaultDeclarativeSlotPool slotPool,
             ResourceCounter resourceRequirements,
             @Nullable LocalTaskManagerLocation taskManagerLocation,
@@ -772,7 +772,7 @@ public class DefaultDeclarativeSlotPoolTest extends 
TestLogger {
     }
 
     @Nonnull
-    private static Collection<PhysicalSlot> 
drainNewSlotService(NewSlotsService notifyNewSlots)
+    static Collection<PhysicalSlot> drainNewSlotService(NewSlotsService 
notifyNewSlots)
             throws InterruptedException {
         final Collection<PhysicalSlot> newSlots = new ArrayList<>();
 
@@ -807,7 +807,7 @@ public class DefaultDeclarativeSlotPoolTest extends 
TestLogger {
         }
     }
 
-    private static final class NewSlotsService implements 
DeclarativeSlotPool.NewSlotsListener {
+    static final class NewSlotsService implements 
DeclarativeSlotPool.NewSlotsListener {
 
         private final BlockingQueue<Collection<? extends PhysicalSlot>> 
physicalSlotsQueue =
                 new ArrayBlockingQueue<>(2);
@@ -854,7 +854,7 @@ public class DefaultDeclarativeSlotPoolTest extends 
TestLogger {
         }
     }
 
-    private static class FreeSlotConsumer
+    static class FreeSlotConsumer
             implements BiFunction<AllocationID, Throwable, 
CompletableFuture<Acknowledge>> {
 
         final BlockingQueue<AllocationID> freedSlots = new 
ArrayBlockingQueue<>(10);
@@ -866,7 +866,7 @@ public class DefaultDeclarativeSlotPoolTest extends 
TestLogger {
             return CompletableFuture.completedFuture(Acknowledge.get());
         }
 
-        private Collection<AllocationID> drainFreedSlots() {
+        Collection<AllocationID> drainFreedSlots() {
             final Collection<AllocationID> result = new ArrayList<>();
 
             freedSlots.drainTo(result);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
index 95a6f225634..3754ac427e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -53,6 +54,15 @@ public final class SlotPoolTestUtils {
         return offerSlots(slotPool, slotOffers, 
createTaskManagerGateway(null));
     }
 
+    @Nonnull
+    public static Collection<SlotOffer> offerSlots(
+            DeclarativeSlotPool slotPool,
+            Collection<? extends SlotOffer> slotOffers,
+            TaskManagerLocation taskManagerLocation) {
+        return slotPool.offerSlots(
+                slotOffers, taskManagerLocation, 
createTaskManagerGateway(null), 0);
+    }
+
     @Nonnull
     public static Collection<SlotOffer> offerSlots(
             DeclarativeSlotPool slotPool,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
index 2d8f4018181..b3c49bc8cb7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.function.QuadConsumer;
 import org.apache.flink.util.function.QuadFunction;
 import org.apache.flink.util.function.TriFunction;
 
@@ -57,8 +56,12 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
                     Collection<SlotOffer>>
             offerSlotsFunction;
 
-    private final QuadConsumer<
-                    Collection<? extends SlotOffer>, TaskManagerLocation, 
TaskManagerGateway, Long>
+    private final QuadFunction<
+                    Collection<? extends SlotOffer>,
+                    TaskManagerLocation,
+                    TaskManagerGateway,
+                    Long,
+                    Collection<SlotOffer>>
             registerSlotsFunction;
 
     private final Supplier<Collection<SlotInfoWithUtilization>> 
getFreeSlotsInformationSupplier;
@@ -93,11 +96,12 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
                             Long,
                             Collection<SlotOffer>>
                     offerSlotsFunction,
-            QuadConsumer<
+            QuadFunction<
                             Collection<? extends SlotOffer>,
                             TaskManagerLocation,
                             TaskManagerGateway,
-                            Long>
+                            Long,
+                            Collection<SlotOffer>>
                     registerSlotsFunction,
             Supplier<Collection<SlotInfoWithUtilization>> 
getFreeSlotsInformationSupplier,
             Supplier<Collection<? extends SlotInfo>> 
getAllSlotsInformationSupplier,
@@ -157,12 +161,13 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
     }
 
     @Override
-    public void registerSlots(
+    public Collection<SlotOffer> registerSlots(
             Collection<? extends SlotOffer> slots,
             TaskManagerLocation taskManagerLocation,
             TaskManagerGateway taskManagerGateway,
             long currentTime) {
-        registerSlotsFunction.accept(slots, taskManagerLocation, 
taskManagerGateway, currentTime);
+        return registerSlotsFunction.apply(
+                slots, taskManagerLocation, taskManagerGateway, currentTime);
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
index ee705fec923..99a02324b29 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
@@ -27,10 +27,10 @@ import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.function.QuadConsumer;
 import org.apache.flink.util.function.QuadFunction;
 import org.apache.flink.util.function.TriFunction;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.function.BiFunction;
@@ -70,9 +70,14 @@ public class TestingDeclarativeSlotPoolBuilder {
     private LongConsumer returnIdleSlotsConsumer = ignored -> {};
     private Consumer<ResourceCounter> setResourceRequirementsConsumer = 
ignored -> {};
     private Function<AllocationID, Boolean> containsFreeSlotFunction = ignored 
-> false;
-    private QuadConsumer<
-                    Collection<? extends SlotOffer>, TaskManagerLocation, 
TaskManagerGateway, Long>
-            registerSlotsFunction = (ignoredA, ignoredB, ignoredC, ignoredD) 
-> {};
+    private QuadFunction<
+                    Collection<? extends SlotOffer>,
+                    TaskManagerLocation,
+                    TaskManagerGateway,
+                    Long,
+                    Collection<SlotOffer>>
+            registerSlotsFunction =
+                    (slotOffers, ignoredB, ignoredC, ignoredD) -> new 
ArrayList<>(slotOffers);
 
     public TestingDeclarativeSlotPoolBuilder 
setIncreaseResourceRequirementsByConsumer(
             Consumer<ResourceCounter> increaseResourceRequirementsByConsumer) {
@@ -111,11 +116,12 @@ public class TestingDeclarativeSlotPoolBuilder {
     }
 
     public TestingDeclarativeSlotPoolBuilder setRegisterSlotsFunction(
-            QuadConsumer<
+            QuadFunction<
                             Collection<? extends SlotOffer>,
                             TaskManagerLocation,
                             TaskManagerGateway,
-                            Long>
+                            Long,
+                            Collection<SlotOffer>>
                     registerSlotsFunction) {
         this.registerSlotsFunction = registerSlotsFunction;
         return this;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
index 598905caf57..aed1790d597 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
@@ -70,7 +70,8 @@ public class TestingSlotPoolServiceBuilder implements 
SlotPoolServiceFactory {
 
     @Nonnull
     @Override
-    public SlotPoolService createSlotPoolService(@Nonnull JobID jobId) {
+    public SlotPoolService createSlotPoolService(
+            @Nonnull JobID jobId, DeclarativeSlotPoolFactory 
declarativeSlotPoolFactory) {
         return new TestingSlotPoolService(
                 jobId,
                 startConsumer,

Reply via email to