This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f1fba33d85a [FLINK-34345][runtime] Remove TaskExecutorManager related 
logic
f1fba33d85a is described below

commit f1fba33d85a802b896170ff3cdb0107ee082c44a
Author: caicancai <[email protected]>
AuthorDate: Sat Feb 3 14:24:56 2024 +0800

    [FLINK-34345][runtime] Remove TaskExecutorManager related logic
---
 .../slotmanager/PendingTaskManagerSlot.java        |  41 --
 .../slotmanager/SlotStatusUpdateListener.java      |  45 --
 .../slotmanager/TaskExecutorManager.java           | 590 ---------------------
 .../slotmanager/TaskManagerRegistration.java       | 127 -----
 .../slotmanager/TaskManagerSlotId.java             |  33 --
 .../slotmanager/TaskExecutorManagerBuilder.java    | 107 ----
 .../slotmanager/TaskExecutorManagerTest.java       | 468 ----------------
 .../TestingTaskManagerSlotInformation.java         | 150 ------
 8 files changed, 1561 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
deleted file mode 100644
index 336c4dae956..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-
-/** Represents a pending task manager slot in the {@link SlotManager}. */
-public class PendingTaskManagerSlot {
-
-    private final TaskManagerSlotId taskManagerSlotId = 
TaskManagerSlotId.generate();
-
-    private final ResourceProfile resourceProfile;
-
-    public PendingTaskManagerSlot(ResourceProfile resourceProfile) {
-        this.resourceProfile = resourceProfile;
-    }
-
-    public TaskManagerSlotId getTaskManagerSlotId() {
-        return taskManagerSlotId;
-    }
-
-    public ResourceProfile getResourceProfile() {
-        return resourceProfile;
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusUpdateListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusUpdateListener.java
deleted file mode 100644
index bc24e1122e2..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusUpdateListener.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-
-/**
- * Interface for components that want to listen to updates to the status of a 
slot.
- *
- * <p>This interface must only be used for updating data-structures, NOT for 
initiating new resource
- * allocations. The event that caused the state transition may also have 
triggered a series of
- * transitions, which new allocations would interfere with.
- */
-interface SlotStatusUpdateListener {
-
-    /**
-     * Notification for the status of a slot having changed.
-     *
-     * <p>If the slot is being freed ({@code current == FREE} then {@code 
jobId} is that of the job
-     * the slot was allocated for. If the slot was already acquired by a job 
({@code current !=
-     * FREE}, then {@code jobId} is the ID of this very job.
-     *
-     * @param slot slot whose status has changed
-     * @param previous state before the change
-     * @param current state after the change
-     * @param jobId job for which the slot was/is allocated for
-     */
-    void notifySlotStatusChange(
-            TaskManagerSlotInformation slot, SlotState previous, SlotState 
current, JobID jobId);
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
deleted file mode 100644
index 221bd44d46c..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ /dev/null
@@ -1,590 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.slots.ResourceRequirement;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/**
- * SlotManager component for various task executor related responsibilities of 
the slot manager,
- * including:
- *
- * <ul>
- *   <li>tracking registered task executors
- *   <li>allocating new task executors (both on-demand, and for redundancy)
- *   <li>releasing idle task executors
- *   <li>tracking pending slots (expected slots from executors that are 
currently being allocated
- *   <li>tracking how many slots are used on each task executor
- * </ul>
- *
- * <p>Dev note: This component only exists to keep the code out of the slot 
manager. It covers many
- * aspects that aren't really the responsibility of the slot manager, and 
should be refactored to
- * live outside the slot manager and split into multiple parts.
- */
-class TaskExecutorManager implements AutoCloseable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorManager.class);
-
-    private final ResourceProfile defaultSlotResourceProfile;
-
-    /** The default resource spec of workers to request. */
-    private final WorkerResourceSpec defaultWorkerResourceSpec;
-
-    private final int numSlotsPerWorker;
-
-    /** Defines the max limitation of the total number of slots. */
-    private final int maxSlotNum;
-
-    /**
-     * Release task executor only when each produced result partition is 
either consumed or failed.
-     */
-    private final boolean waitResultConsumedBeforeRelease;
-
-    /** Defines the number of redundant taskmanagers. */
-    private final int redundantTaskManagerNum;
-
-    /** Timeout after which an unused TaskManager is released. */
-    private final Time taskManagerTimeout;
-
-    /** Callbacks for resource (de-)allocations. */
-    private final ResourceAllocator resourceAllocator;
-
-    /** All currently registered task managers. */
-    private final Map<InstanceID, TaskManagerRegistration> 
taskManagerRegistrations =
-            new HashMap<>();
-
-    private final Map<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots 
= new HashMap<>();
-
-    private final Executor mainThreadExecutor;
-
-    @Nullable private final ScheduledFuture<?> 
taskManagerTimeoutsAndRedundancyCheck;
-
-    private final Set<InstanceID> unWantedWorkers;
-    private final ScheduledExecutor scheduledExecutor;
-    private final Duration declareNeededResourceDelay;
-    private CompletableFuture<Void> declareNeededResourceFuture;
-
-    TaskExecutorManager(
-            WorkerResourceSpec defaultWorkerResourceSpec,
-            int numSlotsPerWorker,
-            int maxNumSlots,
-            boolean waitResultConsumedBeforeRelease,
-            int redundantTaskManagerNum,
-            Time taskManagerTimeout,
-            Duration declareNeededResourceDelay,
-            ScheduledExecutor scheduledExecutor,
-            Executor mainThreadExecutor,
-            ResourceAllocator resourceAllocator) {
-
-        this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
-        this.numSlotsPerWorker = numSlotsPerWorker;
-        this.maxSlotNum = maxNumSlots;
-        this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
-        this.redundantTaskManagerNum = redundantTaskManagerNum;
-        this.taskManagerTimeout = taskManagerTimeout;
-        this.defaultSlotResourceProfile =
-                SlotManagerUtils.generateDefaultSlotResourceProfile(
-                        defaultWorkerResourceSpec, numSlotsPerWorker);
-        this.scheduledExecutor = scheduledExecutor;
-        this.declareNeededResourceDelay = declareNeededResourceDelay;
-        this.unWantedWorkers = new HashSet<>();
-        this.resourceAllocator = Preconditions.checkNotNull(resourceAllocator);
-        this.mainThreadExecutor = mainThreadExecutor;
-        if (resourceAllocator.isSupported()) {
-            taskManagerTimeoutsAndRedundancyCheck =
-                    scheduledExecutor.scheduleWithFixedDelay(
-                            () ->
-                                    mainThreadExecutor.execute(
-                                            
this::checkTaskManagerTimeoutsAndRedundancy),
-                            0L,
-                            taskManagerTimeout.toMilliseconds(),
-                            TimeUnit.MILLISECONDS);
-        } else {
-            taskManagerTimeoutsAndRedundancyCheck = null;
-        }
-    }
-
-    @Override
-    public void close() {
-        if (taskManagerTimeoutsAndRedundancyCheck != null) {
-            taskManagerTimeoutsAndRedundancyCheck.cancel(false);
-        }
-    }
-
-    // 
---------------------------------------------------------------------------------------------
-    // TaskExecutor (un)registration
-    // 
---------------------------------------------------------------------------------------------
-
-    public boolean isTaskManagerRegistered(InstanceID instanceId) {
-        return taskManagerRegistrations.containsKey(instanceId);
-    }
-
-    public boolean registerTaskManager(
-            final TaskExecutorConnection taskExecutorConnection,
-            SlotReport initialSlotReport,
-            ResourceProfile totalResourceProfile,
-            ResourceProfile defaultSlotResourceProfile) {
-        if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
-            LOG.info(
-                    "The total number of slots exceeds the max limitation {}, 
could not register the excess task executor {}.",
-                    maxSlotNum,
-                    taskExecutorConnection.getInstanceID());
-            return false;
-        }
-
-        TaskManagerRegistration taskManagerRegistration =
-                new TaskManagerRegistration(
-                        taskExecutorConnection,
-                        StreamSupport.stream(initialSlotReport.spliterator(), 
false)
-                                .map(SlotStatus::getSlotID)
-                                .collect(Collectors.toList()),
-                        totalResourceProfile,
-                        defaultSlotResourceProfile);
-
-        taskManagerRegistrations.put(
-                taskExecutorConnection.getInstanceID(), 
taskManagerRegistration);
-
-        // next register the new slots
-        for (SlotStatus slotStatus : initialSlotReport) {
-            if (slotStatus.getJobID() == null) {
-                
findAndRemoveExactlyMatchingPendingTaskManagerSlot(slotStatus.getResourceProfile());
-            }
-        }
-
-        return true;
-    }
-
-    private boolean isMaxSlotNumExceededAfterRegistration(SlotReport 
initialSlotReport) {
-        // check if the total number exceed before matching pending slot.
-        if 
(!isMaxSlotNumExceededAfterAdding(initialSlotReport.getNumSlotStatus())) {
-            return false;
-        }
-
-        // check if the total number exceed slots after consuming pending slot.
-        return 
isMaxSlotNumExceededAfterAdding(getNumNonPendingReportedNewSlots(initialSlotReport));
-    }
-
-    private int getNumNonPendingReportedNewSlots(SlotReport slotReport) {
-        final Set<TaskManagerSlotId> matchingPendingSlots = new HashSet<>();
-
-        for (SlotStatus slotStatus : slotReport) {
-            if (slotStatus.getAllocationID() != null) {
-                // only empty registered slots can match pending slots
-                continue;
-            }
-
-            for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
-                if 
(!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId())
-                        && isPendingSlotExactlyMatchingResourceProfile(
-                                pendingTaskManagerSlot, 
slotStatus.getResourceProfile())) {
-                    
matchingPendingSlots.add(pendingTaskManagerSlot.getTaskManagerSlotId());
-                    break; // pendingTaskManagerSlot loop
-                }
-            }
-        }
-        return slotReport.getNumSlotStatus() - matchingPendingSlots.size();
-    }
-
-    private void findAndRemoveExactlyMatchingPendingTaskManagerSlot(
-            ResourceProfile resourceProfile) {
-        for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
-            if (isPendingSlotExactlyMatchingResourceProfile(
-                    pendingTaskManagerSlot, resourceProfile)) {
-                
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
-                return;
-            }
-        }
-    }
-
-    private boolean isPendingSlotExactlyMatchingResourceProfile(
-            PendingTaskManagerSlot pendingTaskManagerSlot, ResourceProfile 
resourceProfile) {
-        return 
pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
-    }
-
-    public void unregisterTaskExecutor(InstanceID instanceId) {
-        taskManagerRegistrations.remove(instanceId);
-        unWantedWorkers.remove(instanceId);
-    }
-
-    public Collection<InstanceID> getTaskExecutors() {
-        return new ArrayList<>(taskManagerRegistrations.keySet());
-    }
-
-    // 
---------------------------------------------------------------------------------------------
-    // TaskExecutor allocation
-    // 
---------------------------------------------------------------------------------------------
-
-    /**
-     * Tries to allocate a worker that can provide a slot with the given 
resource profile.
-     *
-     * @param requestedSlotResourceProfile desired slot profile
-     * @return an upper bound resource requirement that can be fulfilled by 
the new worker, if one
-     *     was allocated
-     */
-    public Optional<ResourceRequirement> allocateWorker(
-            ResourceProfile requestedSlotResourceProfile) {
-        if (!resourceAllocator.isSupported()) {
-            // resource cannot be allocated
-            return Optional.empty();
-        }
-
-        final int numRegisteredSlots = getNumberRegisteredSlots();
-        final int numPendingSlots = getNumberPendingTaskManagerSlots();
-        if (isMaxSlotNumExceededAfterAdding(numSlotsPerWorker)) {
-            LOG.warn(
-                    "Could not allocate {} more slots. The number of 
registered and pending slots is {}, while the maximum is {}.",
-                    numSlotsPerWorker,
-                    numPendingSlots + numRegisteredSlots,
-                    maxSlotNum);
-            return Optional.empty();
-        }
-
-        if 
(!defaultSlotResourceProfile.isMatching(requestedSlotResourceProfile)) {
-            // requested resource profile is unfulfillable
-            return Optional.empty();
-        }
-
-        for (int i = 0; i < numSlotsPerWorker; ++i) {
-            PendingTaskManagerSlot pendingTaskManagerSlot =
-                    new PendingTaskManagerSlot(defaultSlotResourceProfile);
-            pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), 
pendingTaskManagerSlot);
-        }
-
-        declareNeededResourcesWithDelay();
-
-        return Optional.of(
-                ResourceRequirement.create(defaultSlotResourceProfile, 
numSlotsPerWorker));
-    }
-
-    private boolean isMaxSlotNumExceededAfterAdding(int numNewSlot) {
-        return getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() 
+ numNewSlot
-                > maxSlotNum;
-    }
-
-    private Collection<ResourceDeclaration> getResourceDeclaration() {
-        final int pendingWorkerNum =
-                MathUtils.divideRoundUp(getNumberPendingTaskManagerSlots(), 
numSlotsPerWorker);
-        Set<InstanceID> neededRegisteredWorkers = new 
HashSet<>(taskManagerRegistrations.keySet());
-        neededRegisteredWorkers.removeAll(unWantedWorkers);
-        final int totalWorkerNum = pendingWorkerNum + 
neededRegisteredWorkers.size();
-
-        return Collections.singleton(
-                new ResourceDeclaration(
-                        defaultWorkerResourceSpec, totalWorkerNum, new 
HashSet<>(unWantedWorkers)));
-    }
-
-    private void declareNeededResourcesWithDelay() {
-        Preconditions.checkState(resourceAllocator.isSupported());
-
-        if (declareNeededResourceDelay.toMillis() <= 0) {
-            declareNeededResources();
-        } else {
-            if (declareNeededResourceFuture == null || 
declareNeededResourceFuture.isDone()) {
-                declareNeededResourceFuture = new CompletableFuture<>();
-                scheduledExecutor.schedule(
-                        () ->
-                                mainThreadExecutor.execute(
-                                        () -> {
-                                            declareNeededResources();
-                                            
Preconditions.checkNotNull(declareNeededResourceFuture)
-                                                    .complete(null);
-                                        }),
-                        declareNeededResourceDelay.toMillis(),
-                        TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-
-    /** DO NOT call this method directly. Use {@link 
#declareNeededResourcesWithDelay()} instead. */
-    private void declareNeededResources() {
-        resourceAllocator.declareResourceNeeded(getResourceDeclaration());
-    }
-
-    @VisibleForTesting
-    int getNumberPendingTaskManagerSlots() {
-        return pendingSlots.size();
-    }
-
-    // 
---------------------------------------------------------------------------------------------
-    // TaskExecutor idleness / redundancy
-    // 
---------------------------------------------------------------------------------------------
-
-    private void checkTaskManagerTimeoutsAndRedundancy() {
-        if (!taskManagerRegistrations.isEmpty()) {
-            long currentTime = System.currentTimeMillis();
-
-            ArrayList<TaskManagerRegistration> timedOutTaskManagers =
-                    new ArrayList<>(taskManagerRegistrations.size());
-
-            // first retrieve the timed out TaskManagers
-            for (TaskManagerRegistration taskManagerRegistration :
-                    taskManagerRegistrations.values()) {
-                if (currentTime - taskManagerRegistration.getIdleSince()
-                        >= taskManagerTimeout.toMilliseconds()) {
-                    // we collect the instance ids first in order to avoid 
concurrent modifications
-                    // by the
-                    // ResourceAllocator.releaseResource call
-                    timedOutTaskManagers.add(taskManagerRegistration);
-                }
-            }
-
-            int slotsDiff = redundantTaskManagerNum * numSlotsPerWorker - 
getNumberFreeSlots();
-            if (slotsDiff > 0) {
-                if (pendingSlots.isEmpty()) {
-                    // Keep enough redundant taskManagers from time to time.
-                    int requiredTaskManagers =
-                            MathUtils.divideRoundUp(slotsDiff, 
numSlotsPerWorker);
-                    allocateRedundantTaskManagers(requiredTaskManagers);
-                } else {
-                    LOG.debug(
-                            "There are some pending slots, skip allocate 
redundant task manager and wait them fulfilled.");
-                }
-            } else {
-                // second we trigger the release resource callback which can 
decide upon the
-                // resource release
-                int maxReleaseNum = (-slotsDiff) / numSlotsPerWorker;
-                releaseIdleTaskExecutors(
-                        timedOutTaskManagers, Math.min(maxReleaseNum, 
timedOutTaskManagers.size()));
-            }
-        }
-    }
-
-    private void allocateRedundantTaskManagers(int number) {
-        LOG.debug("Allocating {} task executors for redundancy.", number);
-        int allocatedNumber = allocateWorkers(number);
-        if (number != allocatedNumber) {
-            LOG.warn(
-                    "Expect to allocate {} taskManagers. Actually allocate {} 
taskManagers.",
-                    number,
-                    allocatedNumber);
-        }
-    }
-
-    /**
-     * Allocate a number of workers based on the input param.
-     *
-     * @param workerNum the number of workers to allocate
-     * @return the number of successfully allocated workers
-     */
-    private int allocateWorkers(int workerNum) {
-        int allocatedWorkerNum = 0;
-        for (int i = 0; i < workerNum; ++i) {
-            if (allocateWorker(defaultSlotResourceProfile).isPresent()) {
-                ++allocatedWorkerNum;
-            } else {
-                break;
-            }
-        }
-        return allocatedWorkerNum;
-    }
-
-    private void releaseIdleTaskExecutors(
-            ArrayList<TaskManagerRegistration> timedOutTaskManagers, int 
releaseNum) {
-        for (int index = 0; index < releaseNum; ++index) {
-            if (waitResultConsumedBeforeRelease) {
-                
releaseIdleTaskExecutorIfPossible(timedOutTaskManagers.get(index));
-            } else {
-                
releaseIdleTaskExecutor(timedOutTaskManagers.get(index).getInstanceId());
-            }
-        }
-    }
-
-    private void releaseIdleTaskExecutorIfPossible(
-            TaskManagerRegistration taskManagerRegistration) {
-        long idleSince = taskManagerRegistration.getIdleSince();
-        taskManagerRegistration
-                .getTaskManagerConnection()
-                .getTaskExecutorGateway()
-                .canBeReleased()
-                .thenAcceptAsync(
-                        canBeReleased -> {
-                            InstanceID timedOutTaskManagerId =
-                                    taskManagerRegistration.getInstanceId();
-                            boolean stillIdle = idleSince == 
taskManagerRegistration.getIdleSince();
-                            if (stillIdle && canBeReleased) {
-                                releaseIdleTaskExecutor(timedOutTaskManagerId);
-                            }
-                        },
-                        mainThreadExecutor);
-    }
-
-    private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
-        Preconditions.checkState(resourceAllocator.isSupported());
-        LOG.debug(
-                "Release TaskExecutor {} because it exceeded the idle 
timeout.",
-                timedOutTaskManagerId);
-        unWantedWorkers.add(timedOutTaskManagerId);
-        declareNeededResourcesWithDelay();
-    }
-
-    // 
---------------------------------------------------------------------------------------------
-    // slot / resource counts
-    // 
---------------------------------------------------------------------------------------------
-
-    public ResourceProfile getTotalRegisteredResources() {
-        return taskManagerRegistrations.values().stream()
-                .map(TaskManagerRegistration::getTotalResource)
-                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
-    }
-
-    public ResourceProfile getTotalRegisteredResourcesOf(InstanceID 
instanceID) {
-        return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
-                .map(TaskManagerRegistration::getTotalResource)
-                .orElse(ResourceProfile.ZERO);
-    }
-
-    public ResourceProfile getTotalFreeResources() {
-        return taskManagerRegistrations.values().stream()
-                .map(
-                        taskManagerRegistration ->
-                                taskManagerRegistration
-                                        .getDefaultSlotResourceProfile()
-                                        
.multiply(taskManagerRegistration.getNumberFreeSlots()))
-                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
-    }
-
-    public ResourceProfile getTotalFreeResourcesOf(InstanceID instanceID) {
-        return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
-                .map(
-                        taskManagerRegistration ->
-                                taskManagerRegistration
-                                        .getDefaultSlotResourceProfile()
-                                        
.multiply(taskManagerRegistration.getNumberFreeSlots()))
-                .orElse(ResourceProfile.ZERO);
-    }
-
-    public Iterable<SlotID> getSlotsOf(InstanceID instanceId) {
-        return taskManagerRegistrations.get(instanceId).getSlots();
-    }
-
-    public int getNumberRegisteredSlots() {
-        return taskManagerRegistrations.values().stream()
-                .map(TaskManagerRegistration::getNumberRegisteredSlots)
-                .reduce(0, Integer::sum);
-    }
-
-    public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
-        TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceId);
-
-        if (taskManagerRegistration != null) {
-            return taskManagerRegistration.getNumberRegisteredSlots();
-        } else {
-            return 0;
-        }
-    }
-
-    public int getNumberFreeSlots() {
-        return taskManagerRegistrations.values().stream()
-                .map(TaskManagerRegistration::getNumberFreeSlots)
-                .reduce(0, Integer::sum);
-    }
-
-    public int getNumberFreeSlotsOf(InstanceID instanceId) {
-        TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceId);
-
-        if (taskManagerRegistration != null) {
-            return taskManagerRegistration.getNumberFreeSlots();
-        } else {
-            return 0;
-        }
-    }
-
-    public Collection<PendingTaskManagerSlot> getPendingTaskManagerSlots() {
-        return pendingSlots.values();
-    }
-
-    /**
-     * remove unused pending task manager slots.
-     *
-     * @param unusedResourceCounter the count of unused resources.
-     */
-    public void removePendingTaskManagerSlots(ResourceCounter 
unusedResourceCounter) {
-        if (!resourceAllocator.isSupported()) {
-            return;
-        }
-        Preconditions.checkState(unusedResourceCounter.getResources().size() 
== 1);
-        Preconditions.checkState(
-                
unusedResourceCounter.getResources().contains(defaultSlotResourceProfile));
-
-        int wantedPendingSlotsNumber =
-                pendingSlots.size()
-                        - 
unusedResourceCounter.getResourceCount(defaultSlotResourceProfile);
-        pendingSlots.entrySet().removeIf(ignore -> pendingSlots.size() > 
wantedPendingSlotsNumber);
-
-        declareNeededResourcesWithDelay();
-    }
-
-    /** clear all pending task manager slots. */
-    public void clearPendingTaskManagerSlots() {
-        if (!resourceAllocator.isSupported()) {
-            return;
-        }
-        if (!pendingSlots.isEmpty()) {
-            this.pendingSlots.clear();
-            declareNeededResourcesWithDelay();
-        }
-    }
-
-    // 
---------------------------------------------------------------------------------------------
-    // TaskExecutor slot book-keeping
-    // 
---------------------------------------------------------------------------------------------
-
-    public void occupySlot(InstanceID instanceId) {
-        taskManagerRegistrations.get(instanceId).occupySlot();
-    }
-
-    public void freeSlot(InstanceID instanceId) {
-        taskManagerRegistrations.get(instanceId).freeSlot();
-    }
-
-    public void markUsed(InstanceID instanceID) {
-        taskManagerRegistrations.get(instanceID).markUsed();
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
deleted file mode 100644
index 02915dc1a17..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-import java.util.HashSet;
-
-public class TaskManagerRegistration {
-
-    private final TaskExecutorConnection taskManagerConnection;
-
-    private final ResourceProfile defaultSlotResourceProfile;
-
-    private final ResourceProfile totalResource;
-
-    private final HashSet<SlotID> slots;
-
-    private int numberFreeSlots;
-
-    /** Timestamp when the last time becoming idle. Otherwise Long.MAX_VALUE. 
*/
-    private long idleSince;
-
-    public TaskManagerRegistration(
-            TaskExecutorConnection taskManagerConnection,
-            Collection<SlotID> slots,
-            ResourceProfile totalResourceProfile,
-            ResourceProfile defaultSlotResourceProfile) {
-
-        this.taskManagerConnection =
-                Preconditions.checkNotNull(taskManagerConnection, 
"taskManagerConnection");
-        Preconditions.checkNotNull(slots, "slots");
-
-        this.totalResource = Preconditions.checkNotNull(totalResourceProfile);
-        this.defaultSlotResourceProfile = 
Preconditions.checkNotNull(defaultSlotResourceProfile);
-
-        this.slots = new HashSet<>(slots);
-
-        this.numberFreeSlots = slots.size();
-
-        idleSince = System.currentTimeMillis();
-    }
-
-    public TaskExecutorConnection getTaskManagerConnection() {
-        return taskManagerConnection;
-    }
-
-    public InstanceID getInstanceId() {
-        return taskManagerConnection.getInstanceID();
-    }
-
-    public int getNumberRegisteredSlots() {
-        return slots.size();
-    }
-
-    public int getNumberFreeSlots() {
-        return numberFreeSlots;
-    }
-
-    public ResourceProfile getDefaultSlotResourceProfile() {
-        return defaultSlotResourceProfile;
-    }
-
-    public ResourceProfile getTotalResource() {
-        return totalResource;
-    }
-
-    public void freeSlot() {
-        Preconditions.checkState(
-                numberFreeSlots < slots.size(),
-                "The number of free slots cannot exceed the number of 
registered slots. This indicates a bug.");
-        numberFreeSlots++;
-
-        if (numberFreeSlots == getNumberRegisteredSlots() && idleSince == 
Long.MAX_VALUE) {
-            idleSince = System.currentTimeMillis();
-        }
-    }
-
-    public void occupySlot() {
-        Preconditions.checkState(
-                numberFreeSlots > 0, "There are no more free slots. This 
indicates a bug.");
-        numberFreeSlots--;
-
-        idleSince = Long.MAX_VALUE;
-    }
-
-    public Iterable<SlotID> getSlots() {
-        return slots;
-    }
-
-    public long getIdleSince() {
-        return idleSince;
-    }
-
-    public boolean isIdle() {
-        return idleSince != Long.MAX_VALUE;
-    }
-
-    public void markUsed() {
-        idleSince = Long.MAX_VALUE;
-    }
-
-    public boolean containsSlot(SlotID slotId) {
-        return slots.contains(slotId);
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
deleted file mode 100644
index 3331937da7a..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.util.AbstractID;
-
-/** Id of {@link PendingTaskManagerSlot}. */
-public class TaskManagerSlotId extends AbstractID {
-
-    private static final long serialVersionUID = -4024240625523472071L;
-
-    private TaskManagerSlotId() {}
-
-    public static TaskManagerSlotId generate() {
-        return new TaskManagerSlotId();
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
deleted file mode 100644
index 3479b550908..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.util.concurrent.Executors;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-
-import java.time.Duration;
-import java.util.concurrent.Executor;
-
-/** Builder for {@link TaskExecutorManager}. */
-public class TaskExecutorManagerBuilder {
-    private WorkerResourceSpec defaultWorkerResourceSpec =
-            new WorkerResourceSpec.Builder().setCpuCores(4).build();
-    private int numSlotsPerWorker = 1;
-    private int maxSlotNum = 1;
-    private boolean waitResultConsumedBeforeRelease = true;
-    private int redundantTaskManagerNum = 0;
-    private Time taskManagerTimeout = Time.seconds(5);
-    private Duration declareNeededResourceDelay = Duration.ofMillis(0);
-    private final ScheduledExecutor scheduledExecutor;
-    private Executor mainThreadExecutor = Executors.directExecutor();
-    private ResourceAllocator newResourceAllocator = new 
TestingResourceAllocatorBuilder().build();
-
-    public TaskExecutorManagerBuilder(ScheduledExecutor scheduledExecutor) {
-        this.scheduledExecutor = scheduledExecutor;
-    }
-
-    public TaskExecutorManagerBuilder setDefaultWorkerResourceSpec(
-            WorkerResourceSpec defaultWorkerResourceSpec) {
-        this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
-        return this;
-    }
-
-    public TaskExecutorManagerBuilder setNumSlotsPerWorker(int 
numSlotsPerWorker) {
-        this.numSlotsPerWorker = numSlotsPerWorker;
-        return this;
-    }
-
-    public TaskExecutorManagerBuilder setMaxNumSlots(int maxSlotNum) {
-        this.maxSlotNum = maxSlotNum;
-        return this;
-    }
-
-    public TaskExecutorManagerBuilder setWaitResultConsumedBeforeRelease(
-            boolean waitResultConsumedBeforeRelease) {
-        this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
-        return this;
-    }
-
-    public TaskExecutorManagerBuilder setRedundantTaskManagerNum(int 
redundantTaskManagerNum) {
-        this.redundantTaskManagerNum = redundantTaskManagerNum;
-        return this;
-    }
-
-    public TaskExecutorManagerBuilder setTaskManagerTimeout(Time 
taskManagerTimeout) {
-        this.taskManagerTimeout = taskManagerTimeout;
-        return this;
-    }
-
-    public TaskExecutorManagerBuilder setMainThreadExecutor(Executor 
mainThreadExecutor) {
-        this.mainThreadExecutor = mainThreadExecutor;
-        return this;
-    }
-
-    public TaskExecutorManagerBuilder setResourceAllocator(ResourceAllocator 
newResourceAllocator) {
-        this.newResourceAllocator = newResourceAllocator;
-        return this;
-    }
-
-    public TaskExecutorManagerBuilder setDeclareNeededResourceDelay(
-            Duration declareNeededResourceDelay) {
-        this.declareNeededResourceDelay = declareNeededResourceDelay;
-        return this;
-    }
-
-    public TaskExecutorManager createTaskExecutorManager() {
-        return new TaskExecutorManager(
-                defaultWorkerResourceSpec,
-                numSlotsPerWorker,
-                maxSlotNum,
-                waitResultConsumedBeforeRelease,
-                redundantTaskManagerNum,
-                taskManagerTimeout,
-                declareNeededResourceDelay,
-                scheduledExecutor,
-                mainThreadExecutor,
-                newResourceAllocator);
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
deleted file mode 100644
index cee5f7dd5ba..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorExtension;
-import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for the {@link TaskExecutorManager}. */
-class TaskExecutorManagerTest {
-
-    @RegisterExtension
-    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorExtension();
-
-    /** Tests that a pending slot is only fulfilled by an exactly matching 
received slot. */
-    @Test
-    void testPendingSlotNotFulfilledIfProfilesAreNotExactMatch() {
-        final int numWorkerCpuCores = 3;
-        final WorkerResourceSpec workerResourceSpec =
-                new 
WorkerResourceSpec.Builder().setCpuCores(numWorkerCpuCores).build();
-        final ResourceProfile requestedSlotProfile =
-                
ResourceProfile.newBuilder().setCpuCores(numWorkerCpuCores).build();
-        final ResourceProfile offeredSlotProfile =
-                ResourceProfile.newBuilder().setCpuCores(numWorkerCpuCores - 
1).build();
-
-        try (final TaskExecutorManager taskExecutorManager =
-                createTaskExecutorManagerBuilder()
-                        .setDefaultWorkerResourceSpec(workerResourceSpec)
-                        .setNumSlotsPerWorker(
-                                1) // set to one so that the slot profiles 
directly correspond to
-                        // the worker spec
-                        .setMaxNumSlots(2)
-                        .createTaskExecutorManager()) {
-
-            // create pending slot
-            taskExecutorManager.allocateWorker(requestedSlotProfile);
-            
assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
-
-            createAndRegisterTaskExecutor(taskExecutorManager, 1, 
offeredSlotProfile);
-
-            // the slot from the task executor should be accepted, but we 
should still be waiting
-            // for the originally requested slot
-            
assertThat(taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
-            
assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
-        }
-    }
-
-    /** Tests that a pending slot is not fulfilled by an already allocated 
slot. */
-    @Test
-    void testPendingSlotNotFulfilledByAllocatedSlot() {
-        final int numWorkerCpuCores = 3;
-        final WorkerResourceSpec workerResourceSpec =
-                new 
WorkerResourceSpec.Builder().setCpuCores(numWorkerCpuCores).build();
-        final ResourceProfile requestedSlotProfile =
-                
ResourceProfile.newBuilder().setCpuCores(numWorkerCpuCores).build();
-
-        try (final TaskExecutorManager taskExecutorManager =
-                createTaskExecutorManagerBuilder()
-                        .setDefaultWorkerResourceSpec(workerResourceSpec)
-                        .setNumSlotsPerWorker(
-                                1) // set to one so that the slot profiles 
directly correspond to
-                        // the worker spec
-                        .setMaxNumSlots(2)
-                        .createTaskExecutorManager()) {
-
-            // create pending slot
-            taskExecutorManager.allocateWorker(requestedSlotProfile);
-            
assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
-
-            final TaskExecutorConnection taskExecutorConnection = 
createTaskExecutorConnection();
-            final SlotReport slotReport =
-                    new SlotReport(
-                            new SlotStatus(
-                                    new 
SlotID(taskExecutorConnection.getResourceID(), 0),
-                                    requestedSlotProfile,
-                                    JobID.generate(),
-                                    new AllocationID()));
-            taskExecutorManager.registerTaskManager(
-                    taskExecutorConnection, slotReport, ResourceProfile.ANY, 
ResourceProfile.ANY);
-
-            // the slot from the task executor should be accepted, but we 
should still be waiting
-            // for the originally requested slot
-            
assertThat(taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
-            
assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
-        }
-    }
-
-    /**
-     * Tests that a task manager timeout does not remove the slots from the 
SlotManager. A timeout
-     * should only trigger the {@link ResourceAllocator#declareResourceNeeded} 
callback. The
-     * receiver of the callback can then decide what to do with the 
TaskManager.
-     *
-     * <p>See FLINK-7793
-     */
-    @Test
-    void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
-        final Time taskManagerTimeout = Time.milliseconds(10L);
-
-        final CompletableFuture<InstanceID> releaseResourceFuture = new 
CompletableFuture<>();
-        final ResourceAllocator resourceAllocator =
-                createResourceAllocatorBuilder()
-                        .setDeclareResourceNeededConsumer(
-                                (resourceDeclarations) -> {
-                                    
assertThat(resourceDeclarations).hasSize(1);
-                                    ResourceDeclaration resourceDeclaration =
-                                            
resourceDeclarations.iterator().next();
-                                    
assertThat(resourceDeclaration.getNumNeeded()).isZero();
-                                    
assertThat(resourceDeclaration.getUnwantedWorkers()).hasSize(1);
-                                    releaseResourceFuture.complete(
-                                            resourceDeclaration
-                                                    .getUnwantedWorkers()
-                                                    .iterator()
-                                                    .next());
-                                })
-                        .build();
-
-        final Executor mainThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
-
-        try (final TaskExecutorManager taskExecutorManager =
-                createTaskExecutorManagerBuilder()
-                        .setTaskManagerTimeout(taskManagerTimeout)
-                        .setResourceAllocator(resourceAllocator)
-                        .setMainThreadExecutor(mainThreadExecutor)
-                        .createTaskExecutorManager()) {
-
-            CompletableFuture.supplyAsync(
-                            () -> {
-                                InstanceID newTaskExecutorId =
-                                        createAndRegisterTaskExecutor(
-                                                taskExecutorManager, 1, 
ResourceProfile.ANY);
-                                
assertThat(taskExecutorManager.getNumberRegisteredSlots())
-                                        .isEqualTo(1);
-                                return newTaskExecutorId;
-                            },
-                            mainThreadExecutor)
-                    // wait for the timeout to occur
-                    .thenCombine(
-                            releaseResourceFuture,
-                            (registeredInstance, releasedInstance) -> {
-                                
assertThat(registeredInstance).isEqualTo(releasedInstance);
-                                
assertThat(taskExecutorManager.getNumberRegisteredSlots())
-                                        .isEqualTo(1);
-                                return registeredInstance;
-                            })
-                    .thenAccept(
-                            taskExecutorId -> {
-                                
taskExecutorManager.unregisterTaskExecutor(taskExecutorId);
-                                
assertThat(taskExecutorManager.getNumberRegisteredSlots()).isZero();
-                            })
-                    .get();
-        }
-    }
-
-    /**
-     * Tests that formerly used task managers can timeout after all of their 
slots have been freed.
-     */
-    @Test
-    void testTimeoutForUnusedTaskManager() throws Exception {
-        WorkerResourceSpec workerResourceSpec =
-                new WorkerResourceSpec.Builder().setCpuCores(1).build();
-        final ResourceProfile resourceProfile = 
ResourceProfile.newBuilder().setCpuCores(1).build();
-        final Time taskManagerTimeout = Time.milliseconds(50L);
-
-        final AtomicInteger declareResourceCount = new AtomicInteger(0);
-        final CompletableFuture<InstanceID> releaseResourceFuture = new 
CompletableFuture<>();
-        final ResourceAllocator resourceAllocator =
-                new TestingResourceAllocatorBuilder()
-                        .setDeclareResourceNeededConsumer(
-                                (resourceDeclarations) -> {
-                                    
assertThat(resourceDeclarations.size()).isEqualTo(1);
-                                    ResourceDeclaration resourceDeclaration =
-                                            
resourceDeclarations.iterator().next();
-                                    if (declareResourceCount.getAndIncrement() 
== 0) {
-                                        
assertThat(resourceDeclaration.getNumNeeded()).isEqualTo(1);
-                                        
assertThat(resourceDeclaration.getUnwantedWorkers())
-                                                .isEmpty();
-                                    } else {
-                                        
assertThat(resourceDeclaration.getNumNeeded()).isZero();
-                                        
assertThat(resourceDeclaration.getUnwantedWorkers())
-                                                .hasSize(1);
-                                        releaseResourceFuture.complete(
-                                                resourceDeclaration
-                                                        .getUnwantedWorkers()
-                                                        .iterator()
-                                                        .next());
-                                    }
-                                })
-                        .build();
-
-        final Executor mainThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
-
-        try (final TaskExecutorManager taskExecutorManager =
-                createTaskExecutorManagerBuilder()
-                        .setTaskManagerTimeout(taskManagerTimeout)
-                        .setDefaultWorkerResourceSpec(workerResourceSpec)
-                        .setResourceAllocator(resourceAllocator)
-                        .setMainThreadExecutor(mainThreadExecutor)
-                        .createTaskExecutorManager()) {
-
-            CompletableFuture.supplyAsync(
-                            () -> {
-                                
taskExecutorManager.allocateWorker(resourceProfile);
-                                InstanceID taskExecutorId =
-                                        createAndRegisterTaskExecutor(
-                                                taskExecutorManager, 1, 
resourceProfile);
-
-                                taskExecutorManager.occupySlot(taskExecutorId);
-                                taskExecutorManager.freeSlot(taskExecutorId);
-
-                                return taskExecutorId;
-                            },
-                            mainThreadExecutor)
-                    // wait for the timeout to occur
-                    .thenAcceptBoth(
-                            releaseResourceFuture,
-                            (registeredInstance, releasedInstance) ->
-                                    
assertThat(registeredInstance).isEqualTo(releasedInstance))
-                    .get();
-        }
-    }
-
-    @Test
-    void testRequestRedundantTaskManager() {
-        final ResourceProfile resourceProfile = 
ResourceProfile.newBuilder().setCpuCores(1).build();
-        final AtomicInteger declareResourceCount = new AtomicInteger(0);
-        final ResourceAllocator resourceAllocator =
-                new TestingResourceAllocatorBuilder()
-                        .setDeclareResourceNeededConsumer(
-                                (resourceDeclarations) -> 
declareResourceCount.getAndIncrement())
-                        .build();
-        ManuallyTriggeredScheduledExecutor taskRestartExecutor =
-                new ManuallyTriggeredScheduledExecutor();
-        try (final TaskExecutorManager taskExecutorManager =
-                new TaskExecutorManagerBuilder(taskRestartExecutor)
-                        .setRedundantTaskManagerNum(1)
-                        .setMaxNumSlots(10)
-                        .setResourceAllocator(resourceAllocator)
-                        .createTaskExecutorManager()) {
-
-            // do not check redundant task managers with no registered
-            taskRestartExecutor.triggerScheduledTasks();
-            assertThat(declareResourceCount).hasValue(0);
-
-            InstanceID taskExecutorId =
-                    createAndRegisterTaskExecutor(taskExecutorManager, 1, 
resourceProfile);
-            taskExecutorManager.occupySlot(taskExecutorId);
-            assertThat(declareResourceCount).hasValue(0);
-
-            // request 1 redundant task manager
-            taskRestartExecutor.triggerScheduledTasks();
-            assertThat(declareResourceCount).hasValue(1);
-
-            // will not trigger new redundant task managers when there are 
pending slots.
-            taskRestartExecutor.triggerScheduledTasks();
-            assertThat(declareResourceCount).hasValue(1);
-        }
-    }
-
-    /**
-     * Test that the task executor manager only allocates new workers if their 
worker spec can
-     * fulfill the requested resource profile.
-     */
-    @Test
-    void testWorkerOnlyAllocatedIfRequestedSlotCouldBeFulfilled() {
-        final int numCoresPerWorker = 1;
-
-        final WorkerResourceSpec workerResourceSpec =
-                new 
WorkerResourceSpec.Builder().setCpuCores(numCoresPerWorker).build();
-
-        final ResourceProfile requestedProfile =
-                ResourceProfile.newBuilder().setCpuCores(numCoresPerWorker + 
1).build();
-
-        final AtomicInteger declareResourceCount = new AtomicInteger(0);
-        ResourceAllocator resourceAllocator =
-                createResourceAllocatorBuilder()
-                        .setDeclareResourceNeededConsumer(
-                                (resourceDeclarations) -> 
declareResourceCount.incrementAndGet())
-                        .build();
-
-        try (final TaskExecutorManager taskExecutorManager =
-                createTaskExecutorManagerBuilder()
-                        .setDefaultWorkerResourceSpec(workerResourceSpec)
-                        .setNumSlotsPerWorker(1)
-                        .setMaxNumSlots(1)
-                        .setResourceAllocator(resourceAllocator)
-                        .createTaskExecutorManager()) {
-
-            
assertThat(taskExecutorManager.allocateWorker(requestedProfile)).isNotPresent();
-            assertThat(declareResourceCount).hasValue(0);
-        }
-    }
-
-    /**
-     * Test that the task executor manager respects the max limitation of the 
number of slots when
-     * allocating new workers.
-     */
-    @Test
-    void testMaxSlotLimitAllocateWorker() {
-        final int numberSlots = 1;
-        final int maxSlotNum = 1;
-
-        final List<Integer> resourceRequestNumber = new ArrayList<>();
-        ResourceAllocator resourceAllocator =
-                createResourceAllocatorBuilder()
-                        .setDeclareResourceNeededConsumer(
-                                (resourceDeclarations) -> {
-                                    
assertThat(resourceDeclarations).hasSize(1);
-                                    ResourceDeclaration resourceDeclaration =
-                                            
resourceDeclarations.iterator().next();
-                                    
resourceRequestNumber.add(resourceDeclaration.getNumNeeded());
-                                })
-                        .build();
-
-        try (final TaskExecutorManager taskExecutorManager =
-                createTaskExecutorManagerBuilder()
-                        .setNumSlotsPerWorker(numberSlots)
-                        .setMaxNumSlots(maxSlotNum)
-                        .setResourceAllocator(resourceAllocator)
-                        .createTaskExecutorManager()) {
-
-            assertThat(resourceRequestNumber).isEmpty();
-
-            taskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN);
-            assertThat(resourceRequestNumber).containsExactly(1);
-
-            taskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN);
-            assertThat(resourceRequestNumber).containsExactly(1);
-        }
-    }
-
-    /**
-     * Test that the slot manager release resource when the number of slots 
exceed max limit when
-     * new TaskExecutor registered.
-     */
-    @Test
-    void testMaxSlotLimitRegisterWorker() throws Exception {
-        final int numberSlots = 1;
-        final int maxSlotNum = 1;
-
-        try (final TaskExecutorManager taskExecutorManager =
-                createTaskExecutorManagerBuilder()
-                        .setNumSlotsPerWorker(numberSlots)
-                        .setMaxNumSlots(maxSlotNum)
-                        .createTaskExecutorManager()) {
-
-            createAndRegisterTaskExecutor(taskExecutorManager, 1, 
ResourceProfile.ANY);
-            createAndRegisterTaskExecutor(taskExecutorManager, 1, 
ResourceProfile.ANY);
-
-            
assertThat(taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
-        }
-    }
-
-    @Test
-    void testGetResourceOverview() {
-        final ResourceProfile resourceProfile1 = 
ResourceProfile.fromResources(1, 10);
-        final ResourceProfile resourceProfile2 = 
ResourceProfile.fromResources(2, 20);
-
-        try (final TaskExecutorManager taskExecutorManager =
-                
createTaskExecutorManagerBuilder().setMaxNumSlots(4).createTaskExecutorManager())
 {
-            final InstanceID instanceId1 =
-                    createAndRegisterTaskExecutor(taskExecutorManager, 2, 
resourceProfile1);
-            final InstanceID instanceId2 =
-                    createAndRegisterTaskExecutor(taskExecutorManager, 2, 
resourceProfile2);
-            taskExecutorManager.occupySlot(instanceId1);
-            taskExecutorManager.occupySlot(instanceId2);
-
-            assertThat(taskExecutorManager.getTotalFreeResources())
-                    .isEqualTo(resourceProfile1.merge(resourceProfile2));
-            
assertThat(taskExecutorManager.getTotalFreeResourcesOf(instanceId1))
-                    .isEqualTo(resourceProfile1);
-            
assertThat(taskExecutorManager.getTotalFreeResourcesOf(instanceId2))
-                    .isEqualTo(resourceProfile2);
-            assertThat(taskExecutorManager.getTotalRegisteredResources())
-                    
.isEqualTo(resourceProfile1.merge(resourceProfile2).multiply(2));
-            
assertThat(taskExecutorManager.getTotalRegisteredResourcesOf(instanceId1))
-                    .isEqualTo(resourceProfile1.multiply(2));
-            
assertThat(taskExecutorManager.getTotalRegisteredResourcesOf(instanceId2))
-                    .isEqualTo(resourceProfile2.multiply(2));
-        }
-    }
-
-    private static TaskExecutorManagerBuilder 
createTaskExecutorManagerBuilder() {
-        return new TaskExecutorManagerBuilder(
-                        new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))
-                
.setResourceAllocator(createResourceAllocatorBuilder().build());
-    }
-
-    private static TestingResourceAllocatorBuilder 
createResourceAllocatorBuilder() {
-        return new TestingResourceAllocatorBuilder();
-    }
-
-    private static InstanceID createAndRegisterTaskExecutor(
-            TaskExecutorManager taskExecutorManager,
-            int numSlots,
-            ResourceProfile resourceProfile) {
-        final TaskExecutorConnection taskExecutorConnection = 
createTaskExecutorConnection();
-
-        List<SlotStatus> slotStatuses =
-                IntStream.range(0, numSlots)
-                        .mapToObj(
-                                slotNumber ->
-                                        new SlotStatus(
-                                                new SlotID(
-                                                        
taskExecutorConnection.getResourceID(),
-                                                        slotNumber),
-                                                resourceProfile))
-                        .collect(Collectors.toList());
-
-        final SlotReport slotReport = new SlotReport(slotStatuses);
-
-        taskExecutorManager.registerTaskManager(
-                taskExecutorConnection,
-                slotReport,
-                resourceProfile.multiply(numSlots),
-                resourceProfile);
-
-        return taskExecutorConnection.getInstanceID();
-    }
-
-    private static TaskExecutorConnection createTaskExecutorConnection() {
-        return new TaskExecutorConnection(
-                ResourceID.generate(),
-                new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerSlotInformation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerSlotInformation.java
deleted file mode 100644
index 99f67f5dd2c..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerSlotInformation.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-
-import javax.annotation.Nullable;
-
-/** Testing implementation of {@link TaskManagerSlotInformation}. */
-public final class TestingTaskManagerSlotInformation implements 
TaskManagerSlotInformation {
-
-    private final SlotID slotId;
-    private final InstanceID instanceId;
-    @Nullable private final AllocationID allocationId;
-    @Nullable private final JobID jobId;
-    private final ResourceProfile resourceProfile;
-    private final SlotState state;
-    private final TaskExecutorConnection taskExecutorConnection =
-            new TaskExecutorConnection(
-                    ResourceID.generate(),
-                    new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
-
-    private TestingTaskManagerSlotInformation(
-            SlotID slotId,
-            @Nullable AllocationID allocationId,
-            @Nullable JobID jobId,
-            InstanceID instanceId,
-            ResourceProfile resourceProfile,
-            SlotState state) {
-        this.slotId = slotId;
-        this.allocationId = allocationId;
-        this.jobId = jobId;
-        this.instanceId = instanceId;
-        this.resourceProfile = resourceProfile;
-        this.state = state;
-    }
-
-    @Override
-    public SlotID getSlotId() {
-        return slotId;
-    }
-
-    @Override
-    @Nullable
-    public JobID getJobId() {
-        return jobId;
-    }
-
-    @Override
-    @Nullable
-    public AllocationID getAllocationId() {
-        return allocationId;
-    }
-
-    @Override
-    public SlotState getState() {
-        return state;
-    }
-
-    @Override
-    public InstanceID getInstanceId() {
-        return instanceId;
-    }
-
-    @Override
-    public TaskExecutorConnection getTaskManagerConnection() {
-        return taskExecutorConnection;
-    }
-
-    @Override
-    public boolean isMatchingRequirement(ResourceProfile required) {
-        return resourceProfile.isMatching(required);
-    }
-
-    @Override
-    public ResourceProfile getResourceProfile() {
-        return resourceProfile;
-    }
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    static class Builder {
-        private SlotID slotId = new SlotID(ResourceID.generate(), 0);
-        private AllocationID allocationId = new AllocationID();
-        private JobID jobId = new JobID();
-        private InstanceID instanceId = new InstanceID();
-        private ResourceProfile resourceProfile = ResourceProfile.ANY;
-        private SlotState state = SlotState.FREE;
-
-        public Builder setState(SlotState state) {
-            this.state = state;
-            return this;
-        }
-
-        public Builder setInstanceId(InstanceID instanceId) {
-            this.instanceId = instanceId;
-            return this;
-        }
-
-        public Builder setResourceProfile(ResourceProfile resourceProfile) {
-            this.resourceProfile = resourceProfile;
-            return this;
-        }
-
-        public Builder setSlotId(SlotID slotId) {
-            this.slotId = slotId;
-            return this;
-        }
-
-        public Builder setAllocationId(AllocationID allocationId) {
-            this.allocationId = allocationId;
-            return this;
-        }
-
-        public Builder setJobId(JobID jobId) {
-            this.jobId = jobId;
-            return this;
-        }
-
-        public TestingTaskManagerSlotInformation build() {
-            return new TestingTaskManagerSlotInformation(
-                    slotId, allocationId, jobId, instanceId, resourceProfile, 
state);
-        }
-    }
-}

Reply via email to