This is an automated email from the ASF dual-hosted git repository.
huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7299da4cf68 [FLINK-32880][flink-runtime]Fulfill redundant taskmanagers
periodically in FineGrainedSlotManager (#23230)
7299da4cf68 is described below
commit 7299da4cf688a2d87fd918b6327a0573bc88cbd8
Author: xiangyu0xf <[email protected]>
AuthorDate: Fri Aug 18 16:34:48 2023 +0800
[FLINK-32880][flink-runtime]Fulfill redundant taskmanagers periodically in
FineGrainedSlotManager (#23230)
---
.../DefaultResourceAllocationStrategy.java | 25 +++++++--
.../slotmanager/FineGrainedSlotManager.java | 39 +++++++-------
.../slotmanager/ResourceAllocationStrategy.java | 10 ++--
...aseResult.java => ResourceReconcileResult.java} | 36 ++++++++++---
.../DefaultResourceAllocationStrategyTest.java | 62 ++++++++++++++++++----
.../TestingResourceAllocationStrategy.java | 12 ++---
6 files changed, 135 insertions(+), 49 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
index c19351e24cf..52bf0149702 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -137,11 +138,11 @@ public class DefaultResourceAllocationStrategy implements
ResourceAllocationStra
}
@Override
- public ResourceReleaseResult tryReleaseUnusedResources(
+ public ResourceReconcileResult tryReconcileClusterResources(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
ResourceProfile requiredRedundantResources =
totalResourceProfile.multiply(redundantTaskManagerNum);
- ResourceReleaseResult.Builder builder =
ResourceReleaseResult.builder();
+ ResourceReconcileResult.Builder builder =
ResourceReconcileResult.builder();
List<TaskManagerInfo> taskManagersIdleTimeout = new ArrayList<>();
List<TaskManagerInfo> taskManagersNonTimeout = new ArrayList<>();
@@ -212,6 +213,14 @@ public class DefaultResourceAllocationStrategy implements
ResourceAllocationStra
}
}
+ if (!redundantFulfilled) {
+ // fulfill redundant resources
+ tryFulFillRedundantResourcesWithAction(
+ requiredRedundantResources,
+ resourcesToKeep,
+ builder::addPendingTaskManagerToAllocate);
+ }
+
return builder.build();
}
@@ -350,10 +359,20 @@ public class DefaultResourceAllocationStrategy implements
ResourceAllocationStra
.map(internalResourceInfo ->
internalResourceInfo.availableProfile)
.reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+ tryFulFillRedundantResourcesWithAction(
+ requiredRedundantResource,
+ totalAvailableResources,
+ resultBuilder::addPendingTaskManagerAllocate);
+ }
+
+ private void tryFulFillRedundantResourcesWithAction(
+ ResourceProfile requiredRedundantResource,
+ ResourceProfile totalAvailableResources,
+ Consumer<? super PendingTaskManager> fulfillAction) {
while (!canFulfillRequirement(requiredRedundantResource,
totalAvailableResources)) {
PendingTaskManager pendingTaskManager =
new PendingTaskManager(totalResourceProfile,
numSlotsPerWorker);
- resultBuilder.addPendingTaskManagerAllocate(pendingTaskManager);
+ fulfillAction.accept(pendingTaskManager);
totalAvailableResources =
totalAvailableResources.merge(totalResourceProfile);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 772bcc8acdb..132b1cef665 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -117,7 +117,7 @@ public class FineGrainedSlotManager implements SlotManager {
/** Callbacks for resource not enough. */
@Nullable private ResourceEventListener resourceEventListener;
- @Nullable private ScheduledFuture<?> taskManagerReleasableCheck;
+ @Nullable private ScheduledFuture<?> clusterReconciliationCheck;
@Nullable private CompletableFuture<Void> requirementsCheckFuture;
@@ -164,7 +164,7 @@ public class FineGrainedSlotManager implements SlotManager {
resourceAllocator = null;
resourceEventListener = null;
mainThreadExecutor = null;
- taskManagerReleasableCheck = null;
+ clusterReconciliationCheck = null;
requirementsCheckFuture = null;
started = false;
@@ -222,9 +222,9 @@ public class FineGrainedSlotManager implements SlotManager {
started = true;
if (resourceAllocator.isSupported()) {
- taskManagerReleasableCheck =
+ clusterReconciliationCheck =
scheduledExecutor.scheduleWithFixedDelay(
- () ->
mainThreadExecutor.execute(this::tryReleaseUnusedTaskManagers),
+ () ->
mainThreadExecutor.execute(this::checkClusterReconciliation),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
@@ -252,9 +252,9 @@ public class FineGrainedSlotManager implements SlotManager {
slotManagerMetricGroup.close();
// stop the timeout checks for the TaskManagers
- if (taskManagerReleasableCheck != null) {
- taskManagerReleasableCheck.cancel(false);
- taskManagerReleasableCheck = null;
+ if (clusterReconciliationCheck != null) {
+ clusterReconciliationCheck.cancel(false);
+ clusterReconciliationCheck = null;
}
slotStatusSyncer.close();
@@ -291,7 +291,7 @@ public class FineGrainedSlotManager implements SlotManager {
resourceTracker.notifyResourceRequirements(jobId,
Collections.emptyList());
if (resourceAllocator.isSupported()) {
taskManagerTracker.clearPendingAllocationsOfJob(jobId);
- checkTaskManagerReleasable();
+ checkResourcesNeedReconcile();
declareNeededResourcesWithDelay();
}
}
@@ -623,7 +623,7 @@ public class FineGrainedSlotManager implements SlotManager {
if (resourceAllocator.isSupported()
&& !taskManagerTracker.getPendingTaskManagers().isEmpty())
{
taskManagerTracker.replaceAllPendingAllocations(Collections.emptyMap());
- checkTaskManagerReleasable();
+ checkResourcesNeedReconcile();
declareNeededResourcesWithDelay();
}
return;
@@ -679,7 +679,7 @@ public class FineGrainedSlotManager implements SlotManager {
}
if (resourceAllocator.isSupported()) {
- checkTaskManagerReleasable();
+ checkResourcesNeedReconcile();
declareNeededResourcesWithDelay();
}
}
@@ -812,29 +812,32 @@ public class FineGrainedSlotManager implements
SlotManager {
// Internal periodic check methods
//
---------------------------------------------------------------------------------------------
- private void tryReleaseUnusedTaskManagers() {
- if (checkTaskManagerReleasable()) {
+ private void checkClusterReconciliation() {
+ if (checkResourcesNeedReconcile()) {
// only declare on needed.
declareNeededResourcesWithDelay();
}
}
- private boolean checkTaskManagerReleasable() {
- ResourceReleaseResult releaseResult =
-
resourceAllocationStrategy.tryReleaseUnusedResources(taskManagerTracker);
+ private boolean checkResourcesNeedReconcile() {
+ ResourceReconcileResult reconcileResult =
+
resourceAllocationStrategy.tryReconcileClusterResources(taskManagerTracker);
- releaseResult.getPendingTaskManagersToRelease().stream()
+ reconcileResult.getPendingTaskManagersToRelease().stream()
.map(PendingTaskManager::getPendingTaskManagerId)
.forEach(taskManagerTracker::removePendingTaskManager);
- for (TaskManagerInfo taskManagerToRelease :
releaseResult.getTaskManagersToRelease()) {
+ for (TaskManagerInfo taskManagerToRelease :
reconcileResult.getTaskManagersToRelease()) {
if (waitResultConsumedBeforeRelease) {
releaseIdleTaskExecutorIfPossible(taskManagerToRelease);
} else {
releaseIdleTaskExecutor(taskManagerToRelease.getInstanceId());
}
}
- return releaseResult.needRelease();
+
+
reconcileResult.getPendingTaskManagersToAllocate().forEach(this::allocateResource);
+
+ return reconcileResult.needReconcile();
}
private void releaseIdleTaskExecutorIfPossible(TaskManagerInfo
taskManagerInfo) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
index 46f7453d941..05f0d1db7dd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
@@ -48,15 +48,15 @@ public interface ResourceAllocationStrategy {
BlockedTaskManagerChecker blockedTaskManagerChecker);
/**
- * Try to make a release decision to release unused PendingTaskManagers
and TaskManagers. This
- * is more light weighted than {@link #tryFulfillRequirements}, only
consider empty registered /
- * pending workers and assume all requirements are fulfilled by registered
/ pending workers.
+ * Try to make a decision to reconcile the cluster resources. This is more
light weighted than
+ * {@link #tryFulfillRequirements}, only consider empty registered /
pending workers and assume
+ * all requirements are fulfilled by registered / pending workers.
*
* @param taskManagerResourceInfoProvider provide the registered/pending
resources of the
* current cluster
- * @return a {@link ResourceReleaseResult} based on the current status,
which contains the
+ * @return a {@link ResourceReconcileResult} based on the current status,
which contains the
* actions to take
*/
- ResourceReleaseResult tryReleaseUnusedResources(
+ ResourceReconcileResult tryReconcileClusterResources(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReleaseResult.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReconcileResult.java
similarity index 63%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReleaseResult.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReconcileResult.java
index f1482fd1a87..e9ff50345cb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReleaseResult.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReconcileResult.java
@@ -22,17 +22,25 @@ import java.util.ArrayList;
import java.util.List;
/** Contains the results of the {@link ResourceAllocationStrategy}. */
-public class ResourceReleaseResult {
+public class ResourceReconcileResult {
+
+ private final List<PendingTaskManager> pendingTaskManagersToAllocate;
private final List<PendingTaskManager> pendingTaskManagersToRelease;
private final List<TaskManagerInfo> taskManagersToRelease;
- public ResourceReleaseResult(
+ public ResourceReconcileResult(
+ List<PendingTaskManager> pendingTaskManagersToAllocate,
List<PendingTaskManager> pendingTaskManagersToRelease,
List<TaskManagerInfo> taskManagersToRelease) {
+ this.pendingTaskManagersToAllocate = pendingTaskManagersToAllocate;
this.pendingTaskManagersToRelease = pendingTaskManagersToRelease;
this.taskManagersToRelease = taskManagersToRelease;
}
+ public List<PendingTaskManager> getPendingTaskManagersToAllocate() {
+ return pendingTaskManagersToAllocate;
+ }
+
public List<PendingTaskManager> getPendingTaskManagersToRelease() {
return pendingTaskManagersToRelease;
}
@@ -41,8 +49,15 @@ public class ResourceReleaseResult {
return taskManagersToRelease;
}
- public boolean needRelease() {
- return !pendingTaskManagersToRelease.isEmpty() ||
!taskManagersToRelease.isEmpty();
+ /**
+ * Returns whether the cluster resource need reconcile.
+ *
+ * @return True if the cluster resource need reconcile, otherwise false.
+ */
+ public boolean needReconcile() {
+ return pendingTaskManagersToRelease.size() > 0
+ || taskManagersToRelease.size() > 0
+ || pendingTaskManagersToAllocate.size() > 0;
}
public static Builder builder() {
@@ -50,9 +65,15 @@ public class ResourceReleaseResult {
}
public static class Builder {
+ private final List<PendingTaskManager> pendingTaskManagersToAllocate =
new ArrayList<>();
private final List<PendingTaskManager> pendingTaskManagersToRelease =
new ArrayList<>();
private final List<TaskManagerInfo> taskManagersToRelease = new
ArrayList<>();
+ public Builder addPendingTaskManagerToAllocate(PendingTaskManager
pendingTaskManager) {
+ this.pendingTaskManagersToAllocate.add(pendingTaskManager);
+ return this;
+ }
+
public Builder addPendingTaskManagerToRelease(PendingTaskManager
pendingTaskManager) {
this.pendingTaskManagersToRelease.add(pendingTaskManager);
return this;
@@ -63,8 +84,11 @@ public class ResourceReleaseResult {
return this;
}
- public ResourceReleaseResult build() {
- return new ResourceReleaseResult(pendingTaskManagersToRelease,
taskManagersToRelease);
+ public ResourceReconcileResult build() {
+ return new ResourceReconcileResult(
+ pendingTaskManagersToAllocate,
+ pendingTaskManagersToRelease,
+ taskManagersToRelease);
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
index f05de374192..18288cdabb8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
@@ -329,14 +329,15 @@ class DefaultResourceAllocationStrategyTest {
() ->
Collections.singleton(registeredTaskManager))
.build();
- ResourceReleaseResult result =
-
ANY_MATCHING_STRATEGY.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+ ResourceReconcileResult result =
+
ANY_MATCHING_STRATEGY.tryReconcileClusterResources(taskManagerResourceInfoProvider);
assertThat(result.getTaskManagersToRelease()).isEmpty();
registeredTaskManager.setIdleSince(System.currentTimeMillis() - 10);
- result =
ANY_MATCHING_STRATEGY.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+ result =
+
ANY_MATCHING_STRATEGY.tryReconcileClusterResources(taskManagerResourceInfoProvider);
assertThat(result.getTaskManagersToRelease()).containsExactly(registeredTaskManager);
}
@@ -350,8 +351,8 @@ class DefaultResourceAllocationStrategyTest {
() ->
Collections.singleton(pendingTaskManager))
.build();
- ResourceReleaseResult result =
-
ANY_MATCHING_STRATEGY.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+ ResourceReconcileResult result =
+
ANY_MATCHING_STRATEGY.tryReconcileClusterResources(taskManagerResourceInfoProvider);
assertThat(result.getPendingTaskManagersToRelease()).containsExactly(pendingTaskManager);
}
@@ -369,8 +370,8 @@ class DefaultResourceAllocationStrategyTest {
() ->
Collections.singleton(pendingTaskManager))
.build();
- ResourceReleaseResult result =
-
ANY_MATCHING_STRATEGY.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+ ResourceReconcileResult result =
+
ANY_MATCHING_STRATEGY.tryReconcileClusterResources(taskManagerResourceInfoProvider);
assertThat(result.getPendingTaskManagersToRelease()).isEmpty();
}
@@ -440,13 +441,52 @@ class DefaultResourceAllocationStrategyTest {
.build();
DefaultResourceAllocationStrategy strategy = createStrategy(1);
- ResourceReleaseResult result =
-
strategy.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+ ResourceReconcileResult result =
+
strategy.tryReconcileClusterResources(taskManagerResourceInfoProvider);
assertThat(result.getPendingTaskManagersToRelease())
.containsExactly(pendingTaskManagerIdle);
assertThat(result.getTaskManagersToRelease()).containsExactly(taskManagerIdle);
}
+ @Test
+ void testRedundantResourceShouldBeFulfilled() {
+ final TaskManagerInfo taskManagerInUse =
+ new TestingTaskManagerInfo(
+ DEFAULT_SLOT_RESOURCE.multiply(5),
+ DEFAULT_SLOT_RESOURCE.multiply(2),
+ DEFAULT_SLOT_RESOURCE);
+
+ final TestingTaskManagerInfo taskManagerIdle =
+ new TestingTaskManagerInfo(
+ DEFAULT_SLOT_RESOURCE.multiply(5),
+ DEFAULT_SLOT_RESOURCE.multiply(5),
+ DEFAULT_SLOT_RESOURCE);
+ taskManagerIdle.setIdleSince(System.currentTimeMillis() - 10);
+
+ final PendingTaskManager pendingTaskManagerIdle =
+ new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(5),
NUM_OF_SLOTS);
+
+ final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+ TestingTaskManagerResourceInfoProvider.newBuilder()
+ .setRegisteredTaskManagersSupplier(
+ () -> Arrays.asList(taskManagerInUse,
taskManagerIdle))
+ .setPendingTaskManagersSupplier(
+ () ->
Collections.singletonList(pendingTaskManagerIdle))
+ .build();
+
+ DefaultResourceAllocationStrategy strategy = createStrategy(4);
+ ResourceReconcileResult result =
+
strategy.tryReconcileClusterResources(taskManagerResourceInfoProvider);
+
+ // pending task manager should reserved for redundant
+ assertThat(result.getPendingTaskManagersToRelease()).isEmpty();
+ // both in use and idle task manager should be reserved for redundant
+ assertThat(result.getTaskManagersToRelease()).isEmpty();
+ // add two more pending task manager for redundant since total
available resource equals
+ // 12(2+5+5)
+ assertThat(result.getPendingTaskManagersToAllocate()).hasSize(2);
+ }
+
@Test
void testRedundantResourceShouldBeReserved() {
final TaskManagerInfo taskManagerInUse =
@@ -474,8 +514,8 @@ class DefaultResourceAllocationStrategyTest {
.build();
DefaultResourceAllocationStrategy strategy = createStrategy(1);
- ResourceReleaseResult result =
-
strategy.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+ ResourceReconcileResult result =
+
strategy.tryReconcileClusterResources(taskManagerResourceInfoProvider);
// pending task manager should release at first
assertThat(result.getPendingTaskManagersToRelease())
.containsExactly(pendingTaskManagerIdle);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
index 096425a34b4..3fbf1e51db2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
@@ -35,7 +35,7 @@ public class TestingResourceAllocationStrategy implements
ResourceAllocationStra
ResourceAllocationResult>
tryFulfillRequirementsFunction;
- private final Function<TaskManagerResourceInfoProvider,
ResourceReleaseResult>
+ private final Function<TaskManagerResourceInfoProvider,
ResourceReconcileResult>
tryReleaseUnusedResourcesFunction;
private TestingResourceAllocationStrategy(
@@ -44,7 +44,7 @@ public class TestingResourceAllocationStrategy implements
ResourceAllocationStra
TaskManagerResourceInfoProvider,
ResourceAllocationResult>
tryFulfillRequirementsFunction,
- Function<TaskManagerResourceInfoProvider, ResourceReleaseResult>
+ Function<TaskManagerResourceInfoProvider, ResourceReconcileResult>
tryReleaseUnusedResourcesFunction) {
this.tryFulfillRequirementsFunction =
Preconditions.checkNotNull(tryFulfillRequirementsFunction);
@@ -62,7 +62,7 @@ public class TestingResourceAllocationStrategy implements
ResourceAllocationStra
}
@Override
- public ResourceReleaseResult tryReleaseUnusedResources(
+ public ResourceReconcileResult tryReconcileClusterResources(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
return
tryReleaseUnusedResourcesFunction.apply(taskManagerResourceInfoProvider);
}
@@ -79,9 +79,9 @@ public class TestingResourceAllocationStrategy implements
ResourceAllocationStra
tryFulfillRequirementsFunction =
(ignored0, ignored1) ->
ResourceAllocationResult.builder().build();
- private Function<TaskManagerResourceInfoProvider,
ResourceReleaseResult>
+ private Function<TaskManagerResourceInfoProvider,
ResourceReconcileResult>
tryReleaseUnusedResourcesFunction =
- ignored -> ResourceReleaseResult.builder().build();
+ ignored -> ResourceReconcileResult.builder().build();
public Builder setTryFulfillRequirementsFunction(
BiFunction<
@@ -94,7 +94,7 @@ public class TestingResourceAllocationStrategy implements
ResourceAllocationStra
}
public Builder setTryReleaseUnusedResourcesFunction(
- Function<TaskManagerResourceInfoProvider,
ResourceReleaseResult>
+ Function<TaskManagerResourceInfoProvider,
ResourceReconcileResult>
tryReleaseUnusedResourcesFunction) {
this.tryReleaseUnusedResourcesFunction =
tryReleaseUnusedResourcesFunction;
return this;