This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f1fba33d85a [FLINK-34345][runtime] Remove TaskExecutorManager related
logic
f1fba33d85a is described below
commit f1fba33d85a802b896170ff3cdb0107ee082c44a
Author: caicancai <[email protected]>
AuthorDate: Sat Feb 3 14:24:56 2024 +0800
[FLINK-34345][runtime] Remove TaskExecutorManager related logic
---
.../slotmanager/PendingTaskManagerSlot.java | 41 --
.../slotmanager/SlotStatusUpdateListener.java | 45 --
.../slotmanager/TaskExecutorManager.java | 590 ---------------------
.../slotmanager/TaskManagerRegistration.java | 127 -----
.../slotmanager/TaskManagerSlotId.java | 33 --
.../slotmanager/TaskExecutorManagerBuilder.java | 107 ----
.../slotmanager/TaskExecutorManagerTest.java | 468 ----------------
.../TestingTaskManagerSlotInformation.java | 150 ------
8 files changed, 1561 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
deleted file mode 100644
index 336c4dae956..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-
-/** Represents a pending task manager slot in the {@link SlotManager}. */
-public class PendingTaskManagerSlot {
-
- private final TaskManagerSlotId taskManagerSlotId =
TaskManagerSlotId.generate();
-
- private final ResourceProfile resourceProfile;
-
- public PendingTaskManagerSlot(ResourceProfile resourceProfile) {
- this.resourceProfile = resourceProfile;
- }
-
- public TaskManagerSlotId getTaskManagerSlotId() {
- return taskManagerSlotId;
- }
-
- public ResourceProfile getResourceProfile() {
- return resourceProfile;
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusUpdateListener.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusUpdateListener.java
deleted file mode 100644
index bc24e1122e2..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusUpdateListener.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-
-/**
- * Interface for components that want to listen to updates to the status of a
slot.
- *
- * <p>This interface must only be used for updating data-structures, NOT for
initiating new resource
- * allocations. The event that caused the state transition may also have
triggered a series of
- * transitions, which new allocations would interfere with.
- */
-interface SlotStatusUpdateListener {
-
- /**
- * Notification for the status of a slot having changed.
- *
- * <p>If the slot is being freed ({@code current == FREE} then {@code
jobId} is that of the job
- * the slot was allocated for. If the slot was already acquired by a job
({@code current !=
- * FREE}, then {@code jobId} is the ID of this very job.
- *
- * @param slot slot whose status has changed
- * @param previous state before the change
- * @param current state after the change
- * @param jobId job for which the slot was/is allocated for
- */
- void notifySlotStatusChange(
- TaskManagerSlotInformation slot, SlotState previous, SlotState
current, JobID jobId);
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
deleted file mode 100644
index 221bd44d46c..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ /dev/null
@@ -1,590 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.slots.ResourceRequirement;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/**
- * SlotManager component for various task executor related responsibilities of
the slot manager,
- * including:
- *
- * <ul>
- * <li>tracking registered task executors
- * <li>allocating new task executors (both on-demand, and for redundancy)
- * <li>releasing idle task executors
- * <li>tracking pending slots (expected slots from executors that are
currently being allocated
- * <li>tracking how many slots are used on each task executor
- * </ul>
- *
- * <p>Dev note: This component only exists to keep the code out of the slot
manager. It covers many
- * aspects that aren't really the responsibility of the slot manager, and
should be refactored to
- * live outside the slot manager and split into multiple parts.
- */
-class TaskExecutorManager implements AutoCloseable {
- private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutorManager.class);
-
- private final ResourceProfile defaultSlotResourceProfile;
-
- /** The default resource spec of workers to request. */
- private final WorkerResourceSpec defaultWorkerResourceSpec;
-
- private final int numSlotsPerWorker;
-
- /** Defines the max limitation of the total number of slots. */
- private final int maxSlotNum;
-
- /**
- * Release task executor only when each produced result partition is
either consumed or failed.
- */
- private final boolean waitResultConsumedBeforeRelease;
-
- /** Defines the number of redundant taskmanagers. */
- private final int redundantTaskManagerNum;
-
- /** Timeout after which an unused TaskManager is released. */
- private final Time taskManagerTimeout;
-
- /** Callbacks for resource (de-)allocations. */
- private final ResourceAllocator resourceAllocator;
-
- /** All currently registered task managers. */
- private final Map<InstanceID, TaskManagerRegistration>
taskManagerRegistrations =
- new HashMap<>();
-
- private final Map<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots
= new HashMap<>();
-
- private final Executor mainThreadExecutor;
-
- @Nullable private final ScheduledFuture<?>
taskManagerTimeoutsAndRedundancyCheck;
-
- private final Set<InstanceID> unWantedWorkers;
- private final ScheduledExecutor scheduledExecutor;
- private final Duration declareNeededResourceDelay;
- private CompletableFuture<Void> declareNeededResourceFuture;
-
- TaskExecutorManager(
- WorkerResourceSpec defaultWorkerResourceSpec,
- int numSlotsPerWorker,
- int maxNumSlots,
- boolean waitResultConsumedBeforeRelease,
- int redundantTaskManagerNum,
- Time taskManagerTimeout,
- Duration declareNeededResourceDelay,
- ScheduledExecutor scheduledExecutor,
- Executor mainThreadExecutor,
- ResourceAllocator resourceAllocator) {
-
- this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
- this.numSlotsPerWorker = numSlotsPerWorker;
- this.maxSlotNum = maxNumSlots;
- this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
- this.redundantTaskManagerNum = redundantTaskManagerNum;
- this.taskManagerTimeout = taskManagerTimeout;
- this.defaultSlotResourceProfile =
- SlotManagerUtils.generateDefaultSlotResourceProfile(
- defaultWorkerResourceSpec, numSlotsPerWorker);
- this.scheduledExecutor = scheduledExecutor;
- this.declareNeededResourceDelay = declareNeededResourceDelay;
- this.unWantedWorkers = new HashSet<>();
- this.resourceAllocator = Preconditions.checkNotNull(resourceAllocator);
- this.mainThreadExecutor = mainThreadExecutor;
- if (resourceAllocator.isSupported()) {
- taskManagerTimeoutsAndRedundancyCheck =
- scheduledExecutor.scheduleWithFixedDelay(
- () ->
- mainThreadExecutor.execute(
-
this::checkTaskManagerTimeoutsAndRedundancy),
- 0L,
- taskManagerTimeout.toMilliseconds(),
- TimeUnit.MILLISECONDS);
- } else {
- taskManagerTimeoutsAndRedundancyCheck = null;
- }
- }
-
- @Override
- public void close() {
- if (taskManagerTimeoutsAndRedundancyCheck != null) {
- taskManagerTimeoutsAndRedundancyCheck.cancel(false);
- }
- }
-
- //
---------------------------------------------------------------------------------------------
- // TaskExecutor (un)registration
- //
---------------------------------------------------------------------------------------------
-
- public boolean isTaskManagerRegistered(InstanceID instanceId) {
- return taskManagerRegistrations.containsKey(instanceId);
- }
-
- public boolean registerTaskManager(
- final TaskExecutorConnection taskExecutorConnection,
- SlotReport initialSlotReport,
- ResourceProfile totalResourceProfile,
- ResourceProfile defaultSlotResourceProfile) {
- if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
- LOG.info(
- "The total number of slots exceeds the max limitation {},
could not register the excess task executor {}.",
- maxSlotNum,
- taskExecutorConnection.getInstanceID());
- return false;
- }
-
- TaskManagerRegistration taskManagerRegistration =
- new TaskManagerRegistration(
- taskExecutorConnection,
- StreamSupport.stream(initialSlotReport.spliterator(),
false)
- .map(SlotStatus::getSlotID)
- .collect(Collectors.toList()),
- totalResourceProfile,
- defaultSlotResourceProfile);
-
- taskManagerRegistrations.put(
- taskExecutorConnection.getInstanceID(),
taskManagerRegistration);
-
- // next register the new slots
- for (SlotStatus slotStatus : initialSlotReport) {
- if (slotStatus.getJobID() == null) {
-
findAndRemoveExactlyMatchingPendingTaskManagerSlot(slotStatus.getResourceProfile());
- }
- }
-
- return true;
- }
-
- private boolean isMaxSlotNumExceededAfterRegistration(SlotReport
initialSlotReport) {
- // check if the total number exceed before matching pending slot.
- if
(!isMaxSlotNumExceededAfterAdding(initialSlotReport.getNumSlotStatus())) {
- return false;
- }
-
- // check if the total number exceed slots after consuming pending slot.
- return
isMaxSlotNumExceededAfterAdding(getNumNonPendingReportedNewSlots(initialSlotReport));
- }
-
- private int getNumNonPendingReportedNewSlots(SlotReport slotReport) {
- final Set<TaskManagerSlotId> matchingPendingSlots = new HashSet<>();
-
- for (SlotStatus slotStatus : slotReport) {
- if (slotStatus.getAllocationID() != null) {
- // only empty registered slots can match pending slots
- continue;
- }
-
- for (PendingTaskManagerSlot pendingTaskManagerSlot :
pendingSlots.values()) {
- if
(!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId())
- && isPendingSlotExactlyMatchingResourceProfile(
- pendingTaskManagerSlot,
slotStatus.getResourceProfile())) {
-
matchingPendingSlots.add(pendingTaskManagerSlot.getTaskManagerSlotId());
- break; // pendingTaskManagerSlot loop
- }
- }
- }
- return slotReport.getNumSlotStatus() - matchingPendingSlots.size();
- }
-
- private void findAndRemoveExactlyMatchingPendingTaskManagerSlot(
- ResourceProfile resourceProfile) {
- for (PendingTaskManagerSlot pendingTaskManagerSlot :
pendingSlots.values()) {
- if (isPendingSlotExactlyMatchingResourceProfile(
- pendingTaskManagerSlot, resourceProfile)) {
-
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
- return;
- }
- }
- }
-
- private boolean isPendingSlotExactlyMatchingResourceProfile(
- PendingTaskManagerSlot pendingTaskManagerSlot, ResourceProfile
resourceProfile) {
- return
pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
- }
-
- public void unregisterTaskExecutor(InstanceID instanceId) {
- taskManagerRegistrations.remove(instanceId);
- unWantedWorkers.remove(instanceId);
- }
-
- public Collection<InstanceID> getTaskExecutors() {
- return new ArrayList<>(taskManagerRegistrations.keySet());
- }
-
- //
---------------------------------------------------------------------------------------------
- // TaskExecutor allocation
- //
---------------------------------------------------------------------------------------------
-
- /**
- * Tries to allocate a worker that can provide a slot with the given
resource profile.
- *
- * @param requestedSlotResourceProfile desired slot profile
- * @return an upper bound resource requirement that can be fulfilled by
the new worker, if one
- * was allocated
- */
- public Optional<ResourceRequirement> allocateWorker(
- ResourceProfile requestedSlotResourceProfile) {
- if (!resourceAllocator.isSupported()) {
- // resource cannot be allocated
- return Optional.empty();
- }
-
- final int numRegisteredSlots = getNumberRegisteredSlots();
- final int numPendingSlots = getNumberPendingTaskManagerSlots();
- if (isMaxSlotNumExceededAfterAdding(numSlotsPerWorker)) {
- LOG.warn(
- "Could not allocate {} more slots. The number of
registered and pending slots is {}, while the maximum is {}.",
- numSlotsPerWorker,
- numPendingSlots + numRegisteredSlots,
- maxSlotNum);
- return Optional.empty();
- }
-
- if
(!defaultSlotResourceProfile.isMatching(requestedSlotResourceProfile)) {
- // requested resource profile is unfulfillable
- return Optional.empty();
- }
-
- for (int i = 0; i < numSlotsPerWorker; ++i) {
- PendingTaskManagerSlot pendingTaskManagerSlot =
- new PendingTaskManagerSlot(defaultSlotResourceProfile);
- pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(),
pendingTaskManagerSlot);
- }
-
- declareNeededResourcesWithDelay();
-
- return Optional.of(
- ResourceRequirement.create(defaultSlotResourceProfile,
numSlotsPerWorker));
- }
-
- private boolean isMaxSlotNumExceededAfterAdding(int numNewSlot) {
- return getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots()
+ numNewSlot
- > maxSlotNum;
- }
-
- private Collection<ResourceDeclaration> getResourceDeclaration() {
- final int pendingWorkerNum =
- MathUtils.divideRoundUp(getNumberPendingTaskManagerSlots(),
numSlotsPerWorker);
- Set<InstanceID> neededRegisteredWorkers = new
HashSet<>(taskManagerRegistrations.keySet());
- neededRegisteredWorkers.removeAll(unWantedWorkers);
- final int totalWorkerNum = pendingWorkerNum +
neededRegisteredWorkers.size();
-
- return Collections.singleton(
- new ResourceDeclaration(
- defaultWorkerResourceSpec, totalWorkerNum, new
HashSet<>(unWantedWorkers)));
- }
-
- private void declareNeededResourcesWithDelay() {
- Preconditions.checkState(resourceAllocator.isSupported());
-
- if (declareNeededResourceDelay.toMillis() <= 0) {
- declareNeededResources();
- } else {
- if (declareNeededResourceFuture == null ||
declareNeededResourceFuture.isDone()) {
- declareNeededResourceFuture = new CompletableFuture<>();
- scheduledExecutor.schedule(
- () ->
- mainThreadExecutor.execute(
- () -> {
- declareNeededResources();
-
Preconditions.checkNotNull(declareNeededResourceFuture)
- .complete(null);
- }),
- declareNeededResourceDelay.toMillis(),
- TimeUnit.MILLISECONDS);
- }
- }
- }
-
- /** DO NOT call this method directly. Use {@link
#declareNeededResourcesWithDelay()} instead. */
- private void declareNeededResources() {
- resourceAllocator.declareResourceNeeded(getResourceDeclaration());
- }
-
- @VisibleForTesting
- int getNumberPendingTaskManagerSlots() {
- return pendingSlots.size();
- }
-
- //
---------------------------------------------------------------------------------------------
- // TaskExecutor idleness / redundancy
- //
---------------------------------------------------------------------------------------------
-
- private void checkTaskManagerTimeoutsAndRedundancy() {
- if (!taskManagerRegistrations.isEmpty()) {
- long currentTime = System.currentTimeMillis();
-
- ArrayList<TaskManagerRegistration> timedOutTaskManagers =
- new ArrayList<>(taskManagerRegistrations.size());
-
- // first retrieve the timed out TaskManagers
- for (TaskManagerRegistration taskManagerRegistration :
- taskManagerRegistrations.values()) {
- if (currentTime - taskManagerRegistration.getIdleSince()
- >= taskManagerTimeout.toMilliseconds()) {
- // we collect the instance ids first in order to avoid
concurrent modifications
- // by the
- // ResourceAllocator.releaseResource call
- timedOutTaskManagers.add(taskManagerRegistration);
- }
- }
-
- int slotsDiff = redundantTaskManagerNum * numSlotsPerWorker -
getNumberFreeSlots();
- if (slotsDiff > 0) {
- if (pendingSlots.isEmpty()) {
- // Keep enough redundant taskManagers from time to time.
- int requiredTaskManagers =
- MathUtils.divideRoundUp(slotsDiff,
numSlotsPerWorker);
- allocateRedundantTaskManagers(requiredTaskManagers);
- } else {
- LOG.debug(
- "There are some pending slots, skip allocate
redundant task manager and wait them fulfilled.");
- }
- } else {
- // second we trigger the release resource callback which can
decide upon the
- // resource release
- int maxReleaseNum = (-slotsDiff) / numSlotsPerWorker;
- releaseIdleTaskExecutors(
- timedOutTaskManagers, Math.min(maxReleaseNum,
timedOutTaskManagers.size()));
- }
- }
- }
-
- private void allocateRedundantTaskManagers(int number) {
- LOG.debug("Allocating {} task executors for redundancy.", number);
- int allocatedNumber = allocateWorkers(number);
- if (number != allocatedNumber) {
- LOG.warn(
- "Expect to allocate {} taskManagers. Actually allocate {}
taskManagers.",
- number,
- allocatedNumber);
- }
- }
-
- /**
- * Allocate a number of workers based on the input param.
- *
- * @param workerNum the number of workers to allocate
- * @return the number of successfully allocated workers
- */
- private int allocateWorkers(int workerNum) {
- int allocatedWorkerNum = 0;
- for (int i = 0; i < workerNum; ++i) {
- if (allocateWorker(defaultSlotResourceProfile).isPresent()) {
- ++allocatedWorkerNum;
- } else {
- break;
- }
- }
- return allocatedWorkerNum;
- }
-
- private void releaseIdleTaskExecutors(
- ArrayList<TaskManagerRegistration> timedOutTaskManagers, int
releaseNum) {
- for (int index = 0; index < releaseNum; ++index) {
- if (waitResultConsumedBeforeRelease) {
-
releaseIdleTaskExecutorIfPossible(timedOutTaskManagers.get(index));
- } else {
-
releaseIdleTaskExecutor(timedOutTaskManagers.get(index).getInstanceId());
- }
- }
- }
-
- private void releaseIdleTaskExecutorIfPossible(
- TaskManagerRegistration taskManagerRegistration) {
- long idleSince = taskManagerRegistration.getIdleSince();
- taskManagerRegistration
- .getTaskManagerConnection()
- .getTaskExecutorGateway()
- .canBeReleased()
- .thenAcceptAsync(
- canBeReleased -> {
- InstanceID timedOutTaskManagerId =
- taskManagerRegistration.getInstanceId();
- boolean stillIdle = idleSince ==
taskManagerRegistration.getIdleSince();
- if (stillIdle && canBeReleased) {
- releaseIdleTaskExecutor(timedOutTaskManagerId);
- }
- },
- mainThreadExecutor);
- }
-
- private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
- Preconditions.checkState(resourceAllocator.isSupported());
- LOG.debug(
- "Release TaskExecutor {} because it exceeded the idle
timeout.",
- timedOutTaskManagerId);
- unWantedWorkers.add(timedOutTaskManagerId);
- declareNeededResourcesWithDelay();
- }
-
- //
---------------------------------------------------------------------------------------------
- // slot / resource counts
- //
---------------------------------------------------------------------------------------------
-
- public ResourceProfile getTotalRegisteredResources() {
- return taskManagerRegistrations.values().stream()
- .map(TaskManagerRegistration::getTotalResource)
- .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
- }
-
- public ResourceProfile getTotalRegisteredResourcesOf(InstanceID
instanceID) {
- return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
- .map(TaskManagerRegistration::getTotalResource)
- .orElse(ResourceProfile.ZERO);
- }
-
- public ResourceProfile getTotalFreeResources() {
- return taskManagerRegistrations.values().stream()
- .map(
- taskManagerRegistration ->
- taskManagerRegistration
- .getDefaultSlotResourceProfile()
-
.multiply(taskManagerRegistration.getNumberFreeSlots()))
- .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
- }
-
- public ResourceProfile getTotalFreeResourcesOf(InstanceID instanceID) {
- return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
- .map(
- taskManagerRegistration ->
- taskManagerRegistration
- .getDefaultSlotResourceProfile()
-
.multiply(taskManagerRegistration.getNumberFreeSlots()))
- .orElse(ResourceProfile.ZERO);
- }
-
- public Iterable<SlotID> getSlotsOf(InstanceID instanceId) {
- return taskManagerRegistrations.get(instanceId).getSlots();
- }
-
- public int getNumberRegisteredSlots() {
- return taskManagerRegistrations.values().stream()
- .map(TaskManagerRegistration::getNumberRegisteredSlots)
- .reduce(0, Integer::sum);
- }
-
- public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
- TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.get(instanceId);
-
- if (taskManagerRegistration != null) {
- return taskManagerRegistration.getNumberRegisteredSlots();
- } else {
- return 0;
- }
- }
-
- public int getNumberFreeSlots() {
- return taskManagerRegistrations.values().stream()
- .map(TaskManagerRegistration::getNumberFreeSlots)
- .reduce(0, Integer::sum);
- }
-
- public int getNumberFreeSlotsOf(InstanceID instanceId) {
- TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.get(instanceId);
-
- if (taskManagerRegistration != null) {
- return taskManagerRegistration.getNumberFreeSlots();
- } else {
- return 0;
- }
- }
-
- public Collection<PendingTaskManagerSlot> getPendingTaskManagerSlots() {
- return pendingSlots.values();
- }
-
- /**
- * remove unused pending task manager slots.
- *
- * @param unusedResourceCounter the count of unused resources.
- */
- public void removePendingTaskManagerSlots(ResourceCounter
unusedResourceCounter) {
- if (!resourceAllocator.isSupported()) {
- return;
- }
- Preconditions.checkState(unusedResourceCounter.getResources().size()
== 1);
- Preconditions.checkState(
-
unusedResourceCounter.getResources().contains(defaultSlotResourceProfile));
-
- int wantedPendingSlotsNumber =
- pendingSlots.size()
- -
unusedResourceCounter.getResourceCount(defaultSlotResourceProfile);
- pendingSlots.entrySet().removeIf(ignore -> pendingSlots.size() >
wantedPendingSlotsNumber);
-
- declareNeededResourcesWithDelay();
- }
-
- /** clear all pending task manager slots. */
- public void clearPendingTaskManagerSlots() {
- if (!resourceAllocator.isSupported()) {
- return;
- }
- if (!pendingSlots.isEmpty()) {
- this.pendingSlots.clear();
- declareNeededResourcesWithDelay();
- }
- }
-
- //
---------------------------------------------------------------------------------------------
- // TaskExecutor slot book-keeping
- //
---------------------------------------------------------------------------------------------
-
- public void occupySlot(InstanceID instanceId) {
- taskManagerRegistrations.get(instanceId).occupySlot();
- }
-
- public void freeSlot(InstanceID instanceId) {
- taskManagerRegistrations.get(instanceId).freeSlot();
- }
-
- public void markUsed(InstanceID instanceID) {
- taskManagerRegistrations.get(instanceID).markUsed();
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
deleted file mode 100644
index 02915dc1a17..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-import java.util.HashSet;
-
-public class TaskManagerRegistration {
-
- private final TaskExecutorConnection taskManagerConnection;
-
- private final ResourceProfile defaultSlotResourceProfile;
-
- private final ResourceProfile totalResource;
-
- private final HashSet<SlotID> slots;
-
- private int numberFreeSlots;
-
- /** Timestamp when the last time becoming idle. Otherwise Long.MAX_VALUE.
*/
- private long idleSince;
-
- public TaskManagerRegistration(
- TaskExecutorConnection taskManagerConnection,
- Collection<SlotID> slots,
- ResourceProfile totalResourceProfile,
- ResourceProfile defaultSlotResourceProfile) {
-
- this.taskManagerConnection =
- Preconditions.checkNotNull(taskManagerConnection,
"taskManagerConnection");
- Preconditions.checkNotNull(slots, "slots");
-
- this.totalResource = Preconditions.checkNotNull(totalResourceProfile);
- this.defaultSlotResourceProfile =
Preconditions.checkNotNull(defaultSlotResourceProfile);
-
- this.slots = new HashSet<>(slots);
-
- this.numberFreeSlots = slots.size();
-
- idleSince = System.currentTimeMillis();
- }
-
- public TaskExecutorConnection getTaskManagerConnection() {
- return taskManagerConnection;
- }
-
- public InstanceID getInstanceId() {
- return taskManagerConnection.getInstanceID();
- }
-
- public int getNumberRegisteredSlots() {
- return slots.size();
- }
-
- public int getNumberFreeSlots() {
- return numberFreeSlots;
- }
-
- public ResourceProfile getDefaultSlotResourceProfile() {
- return defaultSlotResourceProfile;
- }
-
- public ResourceProfile getTotalResource() {
- return totalResource;
- }
-
- public void freeSlot() {
- Preconditions.checkState(
- numberFreeSlots < slots.size(),
- "The number of free slots cannot exceed the number of
registered slots. This indicates a bug.");
- numberFreeSlots++;
-
- if (numberFreeSlots == getNumberRegisteredSlots() && idleSince ==
Long.MAX_VALUE) {
- idleSince = System.currentTimeMillis();
- }
- }
-
- public void occupySlot() {
- Preconditions.checkState(
- numberFreeSlots > 0, "There are no more free slots. This
indicates a bug.");
- numberFreeSlots--;
-
- idleSince = Long.MAX_VALUE;
- }
-
- public Iterable<SlotID> getSlots() {
- return slots;
- }
-
- public long getIdleSince() {
- return idleSince;
- }
-
- public boolean isIdle() {
- return idleSince != Long.MAX_VALUE;
- }
-
- public void markUsed() {
- idleSince = Long.MAX_VALUE;
- }
-
- public boolean containsSlot(SlotID slotId) {
- return slots.contains(slotId);
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
deleted file mode 100644
index 3331937da7a..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.util.AbstractID;
-
-/** Id of {@link PendingTaskManagerSlot}. */
-public class TaskManagerSlotId extends AbstractID {
-
- private static final long serialVersionUID = -4024240625523472071L;
-
- private TaskManagerSlotId() {}
-
- public static TaskManagerSlotId generate() {
- return new TaskManagerSlotId();
- }
-}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
deleted file mode 100644
index 3479b550908..00000000000
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.util.concurrent.Executors;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-
-import java.time.Duration;
-import java.util.concurrent.Executor;
-
-/** Builder for {@link TaskExecutorManager}. */
-public class TaskExecutorManagerBuilder {
- private WorkerResourceSpec defaultWorkerResourceSpec =
- new WorkerResourceSpec.Builder().setCpuCores(4).build();
- private int numSlotsPerWorker = 1;
- private int maxSlotNum = 1;
- private boolean waitResultConsumedBeforeRelease = true;
- private int redundantTaskManagerNum = 0;
- private Time taskManagerTimeout = Time.seconds(5);
- private Duration declareNeededResourceDelay = Duration.ofMillis(0);
- private final ScheduledExecutor scheduledExecutor;
- private Executor mainThreadExecutor = Executors.directExecutor();
- private ResourceAllocator newResourceAllocator = new
TestingResourceAllocatorBuilder().build();
-
- public TaskExecutorManagerBuilder(ScheduledExecutor scheduledExecutor) {
- this.scheduledExecutor = scheduledExecutor;
- }
-
- public TaskExecutorManagerBuilder setDefaultWorkerResourceSpec(
- WorkerResourceSpec defaultWorkerResourceSpec) {
- this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
- return this;
- }
-
- public TaskExecutorManagerBuilder setNumSlotsPerWorker(int
numSlotsPerWorker) {
- this.numSlotsPerWorker = numSlotsPerWorker;
- return this;
- }
-
- public TaskExecutorManagerBuilder setMaxNumSlots(int maxSlotNum) {
- this.maxSlotNum = maxSlotNum;
- return this;
- }
-
- public TaskExecutorManagerBuilder setWaitResultConsumedBeforeRelease(
- boolean waitResultConsumedBeforeRelease) {
- this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
- return this;
- }
-
- public TaskExecutorManagerBuilder setRedundantTaskManagerNum(int
redundantTaskManagerNum) {
- this.redundantTaskManagerNum = redundantTaskManagerNum;
- return this;
- }
-
- public TaskExecutorManagerBuilder setTaskManagerTimeout(Time
taskManagerTimeout) {
- this.taskManagerTimeout = taskManagerTimeout;
- return this;
- }
-
- public TaskExecutorManagerBuilder setMainThreadExecutor(Executor
mainThreadExecutor) {
- this.mainThreadExecutor = mainThreadExecutor;
- return this;
- }
-
- public TaskExecutorManagerBuilder setResourceAllocator(ResourceAllocator
newResourceAllocator) {
- this.newResourceAllocator = newResourceAllocator;
- return this;
- }
-
- public TaskExecutorManagerBuilder setDeclareNeededResourceDelay(
- Duration declareNeededResourceDelay) {
- this.declareNeededResourceDelay = declareNeededResourceDelay;
- return this;
- }
-
- public TaskExecutorManager createTaskExecutorManager() {
- return new TaskExecutorManager(
- defaultWorkerResourceSpec,
- numSlotsPerWorker,
- maxSlotNum,
- waitResultConsumedBeforeRelease,
- redundantTaskManagerNum,
- taskManagerTimeout,
- declareNeededResourceDelay,
- scheduledExecutor,
- mainThreadExecutor,
- newResourceAllocator);
- }
-}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
deleted file mode 100644
index cee5f7dd5ba..00000000000
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorExtension;
-import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for the {@link TaskExecutorManager}. */
-class TaskExecutorManagerTest {
-
- @RegisterExtension
- static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorExtension();
-
- /** Tests that a pending slot is only fulfilled by an exactly matching
received slot. */
- @Test
- void testPendingSlotNotFulfilledIfProfilesAreNotExactMatch() {
- final int numWorkerCpuCores = 3;
- final WorkerResourceSpec workerResourceSpec =
- new
WorkerResourceSpec.Builder().setCpuCores(numWorkerCpuCores).build();
- final ResourceProfile requestedSlotProfile =
-
ResourceProfile.newBuilder().setCpuCores(numWorkerCpuCores).build();
- final ResourceProfile offeredSlotProfile =
- ResourceProfile.newBuilder().setCpuCores(numWorkerCpuCores -
1).build();
-
- try (final TaskExecutorManager taskExecutorManager =
- createTaskExecutorManagerBuilder()
- .setDefaultWorkerResourceSpec(workerResourceSpec)
- .setNumSlotsPerWorker(
- 1) // set to one so that the slot profiles
directly correspond to
- // the worker spec
- .setMaxNumSlots(2)
- .createTaskExecutorManager()) {
-
- // create pending slot
- taskExecutorManager.allocateWorker(requestedSlotProfile);
-
assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
-
- createAndRegisterTaskExecutor(taskExecutorManager, 1,
offeredSlotProfile);
-
- // the slot from the task executor should be accepted, but we
should still be waiting
- // for the originally requested slot
-
assertThat(taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
-
assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
- }
- }
-
- /** Tests that a pending slot is not fulfilled by an already allocated
slot. */
- @Test
- void testPendingSlotNotFulfilledByAllocatedSlot() {
- final int numWorkerCpuCores = 3;
- final WorkerResourceSpec workerResourceSpec =
- new
WorkerResourceSpec.Builder().setCpuCores(numWorkerCpuCores).build();
- final ResourceProfile requestedSlotProfile =
-
ResourceProfile.newBuilder().setCpuCores(numWorkerCpuCores).build();
-
- try (final TaskExecutorManager taskExecutorManager =
- createTaskExecutorManagerBuilder()
- .setDefaultWorkerResourceSpec(workerResourceSpec)
- .setNumSlotsPerWorker(
- 1) // set to one so that the slot profiles
directly correspond to
- // the worker spec
- .setMaxNumSlots(2)
- .createTaskExecutorManager()) {
-
- // create pending slot
- taskExecutorManager.allocateWorker(requestedSlotProfile);
-
assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
-
- final TaskExecutorConnection taskExecutorConnection =
createTaskExecutorConnection();
- final SlotReport slotReport =
- new SlotReport(
- new SlotStatus(
- new
SlotID(taskExecutorConnection.getResourceID(), 0),
- requestedSlotProfile,
- JobID.generate(),
- new AllocationID()));
- taskExecutorManager.registerTaskManager(
- taskExecutorConnection, slotReport, ResourceProfile.ANY,
ResourceProfile.ANY);
-
- // the slot from the task executor should be accepted, but we
should still be waiting
- // for the originally requested slot
-
assertThat(taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
-
assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
- }
- }
-
- /**
- * Tests that a task manager timeout does not remove the slots from the
SlotManager. A timeout
- * should only trigger the {@link ResourceAllocator#declareResourceNeeded}
callback. The
- * receiver of the callback can then decide what to do with the
TaskManager.
- *
- * <p>See FLINK-7793
- */
- @Test
- void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
- final Time taskManagerTimeout = Time.milliseconds(10L);
-
- final CompletableFuture<InstanceID> releaseResourceFuture = new
CompletableFuture<>();
- final ResourceAllocator resourceAllocator =
- createResourceAllocatorBuilder()
- .setDeclareResourceNeededConsumer(
- (resourceDeclarations) -> {
-
assertThat(resourceDeclarations).hasSize(1);
- ResourceDeclaration resourceDeclaration =
-
resourceDeclarations.iterator().next();
-
assertThat(resourceDeclaration.getNumNeeded()).isZero();
-
assertThat(resourceDeclaration.getUnwantedWorkers()).hasSize(1);
- releaseResourceFuture.complete(
- resourceDeclaration
- .getUnwantedWorkers()
- .iterator()
- .next());
- })
- .build();
-
- final Executor mainThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
-
- try (final TaskExecutorManager taskExecutorManager =
- createTaskExecutorManagerBuilder()
- .setTaskManagerTimeout(taskManagerTimeout)
- .setResourceAllocator(resourceAllocator)
- .setMainThreadExecutor(mainThreadExecutor)
- .createTaskExecutorManager()) {
-
- CompletableFuture.supplyAsync(
- () -> {
- InstanceID newTaskExecutorId =
- createAndRegisterTaskExecutor(
- taskExecutorManager, 1,
ResourceProfile.ANY);
-
assertThat(taskExecutorManager.getNumberRegisteredSlots())
- .isEqualTo(1);
- return newTaskExecutorId;
- },
- mainThreadExecutor)
- // wait for the timeout to occur
- .thenCombine(
- releaseResourceFuture,
- (registeredInstance, releasedInstance) -> {
-
assertThat(registeredInstance).isEqualTo(releasedInstance);
-
assertThat(taskExecutorManager.getNumberRegisteredSlots())
- .isEqualTo(1);
- return registeredInstance;
- })
- .thenAccept(
- taskExecutorId -> {
-
taskExecutorManager.unregisterTaskExecutor(taskExecutorId);
-
assertThat(taskExecutorManager.getNumberRegisteredSlots()).isZero();
- })
- .get();
- }
- }
-
- /**
- * Tests that formerly used task managers can timeout after all of their
slots have been freed.
- */
- @Test
- void testTimeoutForUnusedTaskManager() throws Exception {
- WorkerResourceSpec workerResourceSpec =
- new WorkerResourceSpec.Builder().setCpuCores(1).build();
- final ResourceProfile resourceProfile =
ResourceProfile.newBuilder().setCpuCores(1).build();
- final Time taskManagerTimeout = Time.milliseconds(50L);
-
- final AtomicInteger declareResourceCount = new AtomicInteger(0);
- final CompletableFuture<InstanceID> releaseResourceFuture = new
CompletableFuture<>();
- final ResourceAllocator resourceAllocator =
- new TestingResourceAllocatorBuilder()
- .setDeclareResourceNeededConsumer(
- (resourceDeclarations) -> {
-
assertThat(resourceDeclarations.size()).isEqualTo(1);
- ResourceDeclaration resourceDeclaration =
-
resourceDeclarations.iterator().next();
- if (declareResourceCount.getAndIncrement()
== 0) {
-
assertThat(resourceDeclaration.getNumNeeded()).isEqualTo(1);
-
assertThat(resourceDeclaration.getUnwantedWorkers())
- .isEmpty();
- } else {
-
assertThat(resourceDeclaration.getNumNeeded()).isZero();
-
assertThat(resourceDeclaration.getUnwantedWorkers())
- .hasSize(1);
- releaseResourceFuture.complete(
- resourceDeclaration
- .getUnwantedWorkers()
- .iterator()
- .next());
- }
- })
- .build();
-
- final Executor mainThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
-
- try (final TaskExecutorManager taskExecutorManager =
- createTaskExecutorManagerBuilder()
- .setTaskManagerTimeout(taskManagerTimeout)
- .setDefaultWorkerResourceSpec(workerResourceSpec)
- .setResourceAllocator(resourceAllocator)
- .setMainThreadExecutor(mainThreadExecutor)
- .createTaskExecutorManager()) {
-
- CompletableFuture.supplyAsync(
- () -> {
-
taskExecutorManager.allocateWorker(resourceProfile);
- InstanceID taskExecutorId =
- createAndRegisterTaskExecutor(
- taskExecutorManager, 1,
resourceProfile);
-
- taskExecutorManager.occupySlot(taskExecutorId);
- taskExecutorManager.freeSlot(taskExecutorId);
-
- return taskExecutorId;
- },
- mainThreadExecutor)
- // wait for the timeout to occur
- .thenAcceptBoth(
- releaseResourceFuture,
- (registeredInstance, releasedInstance) ->
-
assertThat(registeredInstance).isEqualTo(releasedInstance))
- .get();
- }
- }
-
- @Test
- void testRequestRedundantTaskManager() {
- final ResourceProfile resourceProfile =
ResourceProfile.newBuilder().setCpuCores(1).build();
- final AtomicInteger declareResourceCount = new AtomicInteger(0);
- final ResourceAllocator resourceAllocator =
- new TestingResourceAllocatorBuilder()
- .setDeclareResourceNeededConsumer(
- (resourceDeclarations) ->
declareResourceCount.getAndIncrement())
- .build();
- ManuallyTriggeredScheduledExecutor taskRestartExecutor =
- new ManuallyTriggeredScheduledExecutor();
- try (final TaskExecutorManager taskExecutorManager =
- new TaskExecutorManagerBuilder(taskRestartExecutor)
- .setRedundantTaskManagerNum(1)
- .setMaxNumSlots(10)
- .setResourceAllocator(resourceAllocator)
- .createTaskExecutorManager()) {
-
- // do not check redundant task managers with no registered
- taskRestartExecutor.triggerScheduledTasks();
- assertThat(declareResourceCount).hasValue(0);
-
- InstanceID taskExecutorId =
- createAndRegisterTaskExecutor(taskExecutorManager, 1,
resourceProfile);
- taskExecutorManager.occupySlot(taskExecutorId);
- assertThat(declareResourceCount).hasValue(0);
-
- // request 1 redundant task manager
- taskRestartExecutor.triggerScheduledTasks();
- assertThat(declareResourceCount).hasValue(1);
-
- // will not trigger new redundant task managers when there are
pending slots.
- taskRestartExecutor.triggerScheduledTasks();
- assertThat(declareResourceCount).hasValue(1);
- }
- }
-
- /**
- * Test that the task executor manager only allocates new workers if their
worker spec can
- * fulfill the requested resource profile.
- */
- @Test
- void testWorkerOnlyAllocatedIfRequestedSlotCouldBeFulfilled() {
- final int numCoresPerWorker = 1;
-
- final WorkerResourceSpec workerResourceSpec =
- new
WorkerResourceSpec.Builder().setCpuCores(numCoresPerWorker).build();
-
- final ResourceProfile requestedProfile =
- ResourceProfile.newBuilder().setCpuCores(numCoresPerWorker +
1).build();
-
- final AtomicInteger declareResourceCount = new AtomicInteger(0);
- ResourceAllocator resourceAllocator =
- createResourceAllocatorBuilder()
- .setDeclareResourceNeededConsumer(
- (resourceDeclarations) ->
declareResourceCount.incrementAndGet())
- .build();
-
- try (final TaskExecutorManager taskExecutorManager =
- createTaskExecutorManagerBuilder()
- .setDefaultWorkerResourceSpec(workerResourceSpec)
- .setNumSlotsPerWorker(1)
- .setMaxNumSlots(1)
- .setResourceAllocator(resourceAllocator)
- .createTaskExecutorManager()) {
-
-
assertThat(taskExecutorManager.allocateWorker(requestedProfile)).isNotPresent();
- assertThat(declareResourceCount).hasValue(0);
- }
- }
-
- /**
- * Test that the task executor manager respects the max limitation of the
number of slots when
- * allocating new workers.
- */
- @Test
- void testMaxSlotLimitAllocateWorker() {
- final int numberSlots = 1;
- final int maxSlotNum = 1;
-
- final List<Integer> resourceRequestNumber = new ArrayList<>();
- ResourceAllocator resourceAllocator =
- createResourceAllocatorBuilder()
- .setDeclareResourceNeededConsumer(
- (resourceDeclarations) -> {
-
assertThat(resourceDeclarations).hasSize(1);
- ResourceDeclaration resourceDeclaration =
-
resourceDeclarations.iterator().next();
-
resourceRequestNumber.add(resourceDeclaration.getNumNeeded());
- })
- .build();
-
- try (final TaskExecutorManager taskExecutorManager =
- createTaskExecutorManagerBuilder()
- .setNumSlotsPerWorker(numberSlots)
- .setMaxNumSlots(maxSlotNum)
- .setResourceAllocator(resourceAllocator)
- .createTaskExecutorManager()) {
-
- assertThat(resourceRequestNumber).isEmpty();
-
- taskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN);
- assertThat(resourceRequestNumber).containsExactly(1);
-
- taskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN);
- assertThat(resourceRequestNumber).containsExactly(1);
- }
- }
-
- /**
- * Test that the slot manager release resource when the number of slots
exceed max limit when
- * new TaskExecutor registered.
- */
- @Test
- void testMaxSlotLimitRegisterWorker() throws Exception {
- final int numberSlots = 1;
- final int maxSlotNum = 1;
-
- try (final TaskExecutorManager taskExecutorManager =
- createTaskExecutorManagerBuilder()
- .setNumSlotsPerWorker(numberSlots)
- .setMaxNumSlots(maxSlotNum)
- .createTaskExecutorManager()) {
-
- createAndRegisterTaskExecutor(taskExecutorManager, 1,
ResourceProfile.ANY);
- createAndRegisterTaskExecutor(taskExecutorManager, 1,
ResourceProfile.ANY);
-
-
assertThat(taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
- }
- }
-
- @Test
- void testGetResourceOverview() {
- final ResourceProfile resourceProfile1 =
ResourceProfile.fromResources(1, 10);
- final ResourceProfile resourceProfile2 =
ResourceProfile.fromResources(2, 20);
-
- try (final TaskExecutorManager taskExecutorManager =
-
createTaskExecutorManagerBuilder().setMaxNumSlots(4).createTaskExecutorManager())
{
- final InstanceID instanceId1 =
- createAndRegisterTaskExecutor(taskExecutorManager, 2,
resourceProfile1);
- final InstanceID instanceId2 =
- createAndRegisterTaskExecutor(taskExecutorManager, 2,
resourceProfile2);
- taskExecutorManager.occupySlot(instanceId1);
- taskExecutorManager.occupySlot(instanceId2);
-
- assertThat(taskExecutorManager.getTotalFreeResources())
- .isEqualTo(resourceProfile1.merge(resourceProfile2));
-
assertThat(taskExecutorManager.getTotalFreeResourcesOf(instanceId1))
- .isEqualTo(resourceProfile1);
-
assertThat(taskExecutorManager.getTotalFreeResourcesOf(instanceId2))
- .isEqualTo(resourceProfile2);
- assertThat(taskExecutorManager.getTotalRegisteredResources())
-
.isEqualTo(resourceProfile1.merge(resourceProfile2).multiply(2));
-
assertThat(taskExecutorManager.getTotalRegisteredResourcesOf(instanceId1))
- .isEqualTo(resourceProfile1.multiply(2));
-
assertThat(taskExecutorManager.getTotalRegisteredResourcesOf(instanceId2))
- .isEqualTo(resourceProfile2.multiply(2));
- }
- }
-
- private static TaskExecutorManagerBuilder
createTaskExecutorManagerBuilder() {
- return new TaskExecutorManagerBuilder(
- new
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))
-
.setResourceAllocator(createResourceAllocatorBuilder().build());
- }
-
- private static TestingResourceAllocatorBuilder
createResourceAllocatorBuilder() {
- return new TestingResourceAllocatorBuilder();
- }
-
- private static InstanceID createAndRegisterTaskExecutor(
- TaskExecutorManager taskExecutorManager,
- int numSlots,
- ResourceProfile resourceProfile) {
- final TaskExecutorConnection taskExecutorConnection =
createTaskExecutorConnection();
-
- List<SlotStatus> slotStatuses =
- IntStream.range(0, numSlots)
- .mapToObj(
- slotNumber ->
- new SlotStatus(
- new SlotID(
-
taskExecutorConnection.getResourceID(),
- slotNumber),
- resourceProfile))
- .collect(Collectors.toList());
-
- final SlotReport slotReport = new SlotReport(slotStatuses);
-
- taskExecutorManager.registerTaskManager(
- taskExecutorConnection,
- slotReport,
- resourceProfile.multiply(numSlots),
- resourceProfile);
-
- return taskExecutorConnection.getInstanceID();
- }
-
- private static TaskExecutorConnection createTaskExecutorConnection() {
- return new TaskExecutorConnection(
- ResourceID.generate(),
- new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
- }
-}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerSlotInformation.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerSlotInformation.java
deleted file mode 100644
index 99f67f5dd2c..00000000000
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerSlotInformation.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-
-import javax.annotation.Nullable;
-
-/** Testing implementation of {@link TaskManagerSlotInformation}. */
-public final class TestingTaskManagerSlotInformation implements
TaskManagerSlotInformation {
-
- private final SlotID slotId;
- private final InstanceID instanceId;
- @Nullable private final AllocationID allocationId;
- @Nullable private final JobID jobId;
- private final ResourceProfile resourceProfile;
- private final SlotState state;
- private final TaskExecutorConnection taskExecutorConnection =
- new TaskExecutorConnection(
- ResourceID.generate(),
- new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
-
- private TestingTaskManagerSlotInformation(
- SlotID slotId,
- @Nullable AllocationID allocationId,
- @Nullable JobID jobId,
- InstanceID instanceId,
- ResourceProfile resourceProfile,
- SlotState state) {
- this.slotId = slotId;
- this.allocationId = allocationId;
- this.jobId = jobId;
- this.instanceId = instanceId;
- this.resourceProfile = resourceProfile;
- this.state = state;
- }
-
- @Override
- public SlotID getSlotId() {
- return slotId;
- }
-
- @Override
- @Nullable
- public JobID getJobId() {
- return jobId;
- }
-
- @Override
- @Nullable
- public AllocationID getAllocationId() {
- return allocationId;
- }
-
- @Override
- public SlotState getState() {
- return state;
- }
-
- @Override
- public InstanceID getInstanceId() {
- return instanceId;
- }
-
- @Override
- public TaskExecutorConnection getTaskManagerConnection() {
- return taskExecutorConnection;
- }
-
- @Override
- public boolean isMatchingRequirement(ResourceProfile required) {
- return resourceProfile.isMatching(required);
- }
-
- @Override
- public ResourceProfile getResourceProfile() {
- return resourceProfile;
- }
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- static class Builder {
- private SlotID slotId = new SlotID(ResourceID.generate(), 0);
- private AllocationID allocationId = new AllocationID();
- private JobID jobId = new JobID();
- private InstanceID instanceId = new InstanceID();
- private ResourceProfile resourceProfile = ResourceProfile.ANY;
- private SlotState state = SlotState.FREE;
-
- public Builder setState(SlotState state) {
- this.state = state;
- return this;
- }
-
- public Builder setInstanceId(InstanceID instanceId) {
- this.instanceId = instanceId;
- return this;
- }
-
- public Builder setResourceProfile(ResourceProfile resourceProfile) {
- this.resourceProfile = resourceProfile;
- return this;
- }
-
- public Builder setSlotId(SlotID slotId) {
- this.slotId = slotId;
- return this;
- }
-
- public Builder setAllocationId(AllocationID allocationId) {
- this.allocationId = allocationId;
- return this;
- }
-
- public Builder setJobId(JobID jobId) {
- this.jobId = jobId;
- return this;
- }
-
- public TestingTaskManagerSlotInformation build() {
- return new TestingTaskManagerSlotInformation(
- slotId, allocationId, jobId, instanceId, resourceProfile,
state);
- }
- }
-}