This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7299da4cf68 [FLINK-32880][flink-runtime]Fulfill redundant taskmanagers 
periodically in FineGrainedSlotManager (#23230)
7299da4cf68 is described below

commit 7299da4cf688a2d87fd918b6327a0573bc88cbd8
Author: xiangyu0xf <[email protected]>
AuthorDate: Fri Aug 18 16:34:48 2023 +0800

    [FLINK-32880][flink-runtime]Fulfill redundant taskmanagers periodically in 
FineGrainedSlotManager (#23230)
---
 .../DefaultResourceAllocationStrategy.java         | 25 +++++++--
 .../slotmanager/FineGrainedSlotManager.java        | 39 +++++++-------
 .../slotmanager/ResourceAllocationStrategy.java    | 10 ++--
 ...aseResult.java => ResourceReconcileResult.java} | 36 ++++++++++---
 .../DefaultResourceAllocationStrategyTest.java     | 62 ++++++++++++++++++----
 .../TestingResourceAllocationStrategy.java         | 12 ++---
 6 files changed, 135 insertions(+), 49 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
index c19351e24cf..52bf0149702 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -137,11 +138,11 @@ public class DefaultResourceAllocationStrategy implements 
ResourceAllocationStra
     }
 
     @Override
-    public ResourceReleaseResult tryReleaseUnusedResources(
+    public ResourceReconcileResult tryReconcileClusterResources(
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
         ResourceProfile requiredRedundantResources =
                 totalResourceProfile.multiply(redundantTaskManagerNum);
-        ResourceReleaseResult.Builder builder = 
ResourceReleaseResult.builder();
+        ResourceReconcileResult.Builder builder = 
ResourceReconcileResult.builder();
 
         List<TaskManagerInfo> taskManagersIdleTimeout = new ArrayList<>();
         List<TaskManagerInfo> taskManagersNonTimeout = new ArrayList<>();
@@ -212,6 +213,14 @@ public class DefaultResourceAllocationStrategy implements 
ResourceAllocationStra
             }
         }
 
+        if (!redundantFulfilled) {
+            // fulfill redundant resources
+            tryFulFillRedundantResourcesWithAction(
+                    requiredRedundantResources,
+                    resourcesToKeep,
+                    builder::addPendingTaskManagerToAllocate);
+        }
+
         return builder.build();
     }
 
@@ -350,10 +359,20 @@ public class DefaultResourceAllocationStrategy implements 
ResourceAllocationStra
                         .map(internalResourceInfo -> 
internalResourceInfo.availableProfile)
                         .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
 
+        tryFulFillRedundantResourcesWithAction(
+                requiredRedundantResource,
+                totalAvailableResources,
+                resultBuilder::addPendingTaskManagerAllocate);
+    }
+
+    private void tryFulFillRedundantResourcesWithAction(
+            ResourceProfile requiredRedundantResource,
+            ResourceProfile totalAvailableResources,
+            Consumer<? super PendingTaskManager> fulfillAction) {
         while (!canFulfillRequirement(requiredRedundantResource, 
totalAvailableResources)) {
             PendingTaskManager pendingTaskManager =
                     new PendingTaskManager(totalResourceProfile, 
numSlotsPerWorker);
-            resultBuilder.addPendingTaskManagerAllocate(pendingTaskManager);
+            fulfillAction.accept(pendingTaskManager);
             totalAvailableResources = 
totalAvailableResources.merge(totalResourceProfile);
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 772bcc8acdb..132b1cef665 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -117,7 +117,7 @@ public class FineGrainedSlotManager implements SlotManager {
     /** Callbacks for resource not enough. */
     @Nullable private ResourceEventListener resourceEventListener;
 
-    @Nullable private ScheduledFuture<?> taskManagerReleasableCheck;
+    @Nullable private ScheduledFuture<?> clusterReconciliationCheck;
 
     @Nullable private CompletableFuture<Void> requirementsCheckFuture;
 
@@ -164,7 +164,7 @@ public class FineGrainedSlotManager implements SlotManager {
         resourceAllocator = null;
         resourceEventListener = null;
         mainThreadExecutor = null;
-        taskManagerReleasableCheck = null;
+        clusterReconciliationCheck = null;
         requirementsCheckFuture = null;
 
         started = false;
@@ -222,9 +222,9 @@ public class FineGrainedSlotManager implements SlotManager {
         started = true;
 
         if (resourceAllocator.isSupported()) {
-            taskManagerReleasableCheck =
+            clusterReconciliationCheck =
                     scheduledExecutor.scheduleWithFixedDelay(
-                            () -> 
mainThreadExecutor.execute(this::tryReleaseUnusedTaskManagers),
+                            () -> 
mainThreadExecutor.execute(this::checkClusterReconciliation),
                             0L,
                             taskManagerTimeout.toMilliseconds(),
                             TimeUnit.MILLISECONDS);
@@ -252,9 +252,9 @@ public class FineGrainedSlotManager implements SlotManager {
         slotManagerMetricGroup.close();
 
         // stop the timeout checks for the TaskManagers
-        if (taskManagerReleasableCheck != null) {
-            taskManagerReleasableCheck.cancel(false);
-            taskManagerReleasableCheck = null;
+        if (clusterReconciliationCheck != null) {
+            clusterReconciliationCheck.cancel(false);
+            clusterReconciliationCheck = null;
         }
 
         slotStatusSyncer.close();
@@ -291,7 +291,7 @@ public class FineGrainedSlotManager implements SlotManager {
         resourceTracker.notifyResourceRequirements(jobId, 
Collections.emptyList());
         if (resourceAllocator.isSupported()) {
             taskManagerTracker.clearPendingAllocationsOfJob(jobId);
-            checkTaskManagerReleasable();
+            checkResourcesNeedReconcile();
             declareNeededResourcesWithDelay();
         }
     }
@@ -623,7 +623,7 @@ public class FineGrainedSlotManager implements SlotManager {
             if (resourceAllocator.isSupported()
                     && !taskManagerTracker.getPendingTaskManagers().isEmpty()) 
{
                 
taskManagerTracker.replaceAllPendingAllocations(Collections.emptyMap());
-                checkTaskManagerReleasable();
+                checkResourcesNeedReconcile();
                 declareNeededResourcesWithDelay();
             }
             return;
@@ -679,7 +679,7 @@ public class FineGrainedSlotManager implements SlotManager {
         }
 
         if (resourceAllocator.isSupported()) {
-            checkTaskManagerReleasable();
+            checkResourcesNeedReconcile();
             declareNeededResourcesWithDelay();
         }
     }
@@ -812,29 +812,32 @@ public class FineGrainedSlotManager implements 
SlotManager {
     // Internal periodic check methods
     // 
---------------------------------------------------------------------------------------------
 
-    private void tryReleaseUnusedTaskManagers() {
-        if (checkTaskManagerReleasable()) {
+    private void checkClusterReconciliation() {
+        if (checkResourcesNeedReconcile()) {
             // only declare on needed.
             declareNeededResourcesWithDelay();
         }
     }
 
-    private boolean checkTaskManagerReleasable() {
-        ResourceReleaseResult releaseResult =
-                
resourceAllocationStrategy.tryReleaseUnusedResources(taskManagerTracker);
+    private boolean checkResourcesNeedReconcile() {
+        ResourceReconcileResult reconcileResult =
+                
resourceAllocationStrategy.tryReconcileClusterResources(taskManagerTracker);
 
-        releaseResult.getPendingTaskManagersToRelease().stream()
+        reconcileResult.getPendingTaskManagersToRelease().stream()
                 .map(PendingTaskManager::getPendingTaskManagerId)
                 .forEach(taskManagerTracker::removePendingTaskManager);
 
-        for (TaskManagerInfo taskManagerToRelease : 
releaseResult.getTaskManagersToRelease()) {
+        for (TaskManagerInfo taskManagerToRelease : 
reconcileResult.getTaskManagersToRelease()) {
             if (waitResultConsumedBeforeRelease) {
                 releaseIdleTaskExecutorIfPossible(taskManagerToRelease);
             } else {
                 releaseIdleTaskExecutor(taskManagerToRelease.getInstanceId());
             }
         }
-        return releaseResult.needRelease();
+
+        
reconcileResult.getPendingTaskManagersToAllocate().forEach(this::allocateResource);
+
+        return reconcileResult.needReconcile();
     }
 
     private void releaseIdleTaskExecutorIfPossible(TaskManagerInfo 
taskManagerInfo) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
index 46f7453d941..05f0d1db7dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
@@ -48,15 +48,15 @@ public interface ResourceAllocationStrategy {
             BlockedTaskManagerChecker blockedTaskManagerChecker);
 
     /**
-     * Try to make a release decision to release unused PendingTaskManagers 
and TaskManagers. This
-     * is more light weighted than {@link #tryFulfillRequirements}, only 
consider empty registered /
-     * pending workers and assume all requirements are fulfilled by registered 
/ pending workers.
+     * Try to make a decision to reconcile the cluster resources. This is more 
light weighted than
+     * {@link #tryFulfillRequirements}, only consider empty registered / 
pending workers and assume
+     * all requirements are fulfilled by registered / pending workers.
      *
      * @param taskManagerResourceInfoProvider provide the registered/pending 
resources of the
      *     current cluster
-     * @return a {@link ResourceReleaseResult} based on the current status, 
which contains the
+     * @return a {@link ResourceReconcileResult} based on the current status, 
which contains the
      *     actions to take
      */
-    ResourceReleaseResult tryReleaseUnusedResources(
+    ResourceReconcileResult tryReconcileClusterResources(
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReleaseResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReconcileResult.java
similarity index 63%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReleaseResult.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReconcileResult.java
index f1482fd1a87..e9ff50345cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReleaseResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReconcileResult.java
@@ -22,17 +22,25 @@ import java.util.ArrayList;
 import java.util.List;
 
 /** Contains the results of the {@link ResourceAllocationStrategy}. */
-public class ResourceReleaseResult {
+public class ResourceReconcileResult {
+
+    private final List<PendingTaskManager> pendingTaskManagersToAllocate;
     private final List<PendingTaskManager> pendingTaskManagersToRelease;
     private final List<TaskManagerInfo> taskManagersToRelease;
 
-    public ResourceReleaseResult(
+    public ResourceReconcileResult(
+            List<PendingTaskManager> pendingTaskManagersToAllocate,
             List<PendingTaskManager> pendingTaskManagersToRelease,
             List<TaskManagerInfo> taskManagersToRelease) {
+        this.pendingTaskManagersToAllocate = pendingTaskManagersToAllocate;
         this.pendingTaskManagersToRelease = pendingTaskManagersToRelease;
         this.taskManagersToRelease = taskManagersToRelease;
     }
 
+    public List<PendingTaskManager> getPendingTaskManagersToAllocate() {
+        return pendingTaskManagersToAllocate;
+    }
+
     public List<PendingTaskManager> getPendingTaskManagersToRelease() {
         return pendingTaskManagersToRelease;
     }
@@ -41,8 +49,15 @@ public class ResourceReleaseResult {
         return taskManagersToRelease;
     }
 
-    public boolean needRelease() {
-        return !pendingTaskManagersToRelease.isEmpty() || 
!taskManagersToRelease.isEmpty();
+    /**
+     * Returns whether the cluster resource need reconcile.
+     *
+     * @return True if the cluster resource need reconcile, otherwise false.
+     */
+    public boolean needReconcile() {
+        return pendingTaskManagersToRelease.size() > 0
+                || taskManagersToRelease.size() > 0
+                || pendingTaskManagersToAllocate.size() > 0;
     }
 
     public static Builder builder() {
@@ -50,9 +65,15 @@ public class ResourceReleaseResult {
     }
 
     public static class Builder {
+        private final List<PendingTaskManager> pendingTaskManagersToAllocate = 
new ArrayList<>();
         private final List<PendingTaskManager> pendingTaskManagersToRelease = 
new ArrayList<>();
         private final List<TaskManagerInfo> taskManagersToRelease = new 
ArrayList<>();
 
+        public Builder addPendingTaskManagerToAllocate(PendingTaskManager 
pendingTaskManager) {
+            this.pendingTaskManagersToAllocate.add(pendingTaskManager);
+            return this;
+        }
+
         public Builder addPendingTaskManagerToRelease(PendingTaskManager 
pendingTaskManager) {
             this.pendingTaskManagersToRelease.add(pendingTaskManager);
             return this;
@@ -63,8 +84,11 @@ public class ResourceReleaseResult {
             return this;
         }
 
-        public ResourceReleaseResult build() {
-            return new ResourceReleaseResult(pendingTaskManagersToRelease, 
taskManagersToRelease);
+        public ResourceReconcileResult build() {
+            return new ResourceReconcileResult(
+                    pendingTaskManagersToAllocate,
+                    pendingTaskManagersToRelease,
+                    taskManagersToRelease);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
index f05de374192..18288cdabb8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
@@ -329,14 +329,15 @@ class DefaultResourceAllocationStrategyTest {
                                 () -> 
Collections.singleton(registeredTaskManager))
                         .build();
 
-        ResourceReleaseResult result =
-                
ANY_MATCHING_STRATEGY.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+        ResourceReconcileResult result =
+                
ANY_MATCHING_STRATEGY.tryReconcileClusterResources(taskManagerResourceInfoProvider);
 
         assertThat(result.getTaskManagersToRelease()).isEmpty();
 
         registeredTaskManager.setIdleSince(System.currentTimeMillis() - 10);
 
-        result = 
ANY_MATCHING_STRATEGY.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+        result =
+                
ANY_MATCHING_STRATEGY.tryReconcileClusterResources(taskManagerResourceInfoProvider);
         
assertThat(result.getTaskManagersToRelease()).containsExactly(registeredTaskManager);
     }
 
@@ -350,8 +351,8 @@ class DefaultResourceAllocationStrategyTest {
                                 () -> 
Collections.singleton(pendingTaskManager))
                         .build();
 
-        ResourceReleaseResult result =
-                
ANY_MATCHING_STRATEGY.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+        ResourceReconcileResult result =
+                
ANY_MATCHING_STRATEGY.tryReconcileClusterResources(taskManagerResourceInfoProvider);
 
         
assertThat(result.getPendingTaskManagersToRelease()).containsExactly(pendingTaskManager);
     }
@@ -369,8 +370,8 @@ class DefaultResourceAllocationStrategyTest {
                                 () -> 
Collections.singleton(pendingTaskManager))
                         .build();
 
-        ResourceReleaseResult result =
-                
ANY_MATCHING_STRATEGY.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+        ResourceReconcileResult result =
+                
ANY_MATCHING_STRATEGY.tryReconcileClusterResources(taskManagerResourceInfoProvider);
 
         assertThat(result.getPendingTaskManagersToRelease()).isEmpty();
     }
@@ -440,13 +441,52 @@ class DefaultResourceAllocationStrategyTest {
                         .build();
 
         DefaultResourceAllocationStrategy strategy = createStrategy(1);
-        ResourceReleaseResult result =
-                
strategy.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+        ResourceReconcileResult result =
+                
strategy.tryReconcileClusterResources(taskManagerResourceInfoProvider);
         assertThat(result.getPendingTaskManagersToRelease())
                 .containsExactly(pendingTaskManagerIdle);
         
assertThat(result.getTaskManagersToRelease()).containsExactly(taskManagerIdle);
     }
 
+    @Test
+    void testRedundantResourceShouldBeFulfilled() {
+        final TaskManagerInfo taskManagerInUse =
+                new TestingTaskManagerInfo(
+                        DEFAULT_SLOT_RESOURCE.multiply(5),
+                        DEFAULT_SLOT_RESOURCE.multiply(2),
+                        DEFAULT_SLOT_RESOURCE);
+
+        final TestingTaskManagerInfo taskManagerIdle =
+                new TestingTaskManagerInfo(
+                        DEFAULT_SLOT_RESOURCE.multiply(5),
+                        DEFAULT_SLOT_RESOURCE.multiply(5),
+                        DEFAULT_SLOT_RESOURCE);
+        taskManagerIdle.setIdleSince(System.currentTimeMillis() - 10);
+
+        final PendingTaskManager pendingTaskManagerIdle =
+                new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(5), 
NUM_OF_SLOTS);
+
+        final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+                TestingTaskManagerResourceInfoProvider.newBuilder()
+                        .setRegisteredTaskManagersSupplier(
+                                () -> Arrays.asList(taskManagerInUse, 
taskManagerIdle))
+                        .setPendingTaskManagersSupplier(
+                                () -> 
Collections.singletonList(pendingTaskManagerIdle))
+                        .build();
+
+        DefaultResourceAllocationStrategy strategy = createStrategy(4);
+        ResourceReconcileResult result =
+                
strategy.tryReconcileClusterResources(taskManagerResourceInfoProvider);
+
+        // pending task manager should reserved for redundant
+        assertThat(result.getPendingTaskManagersToRelease()).isEmpty();
+        // both in use and idle task manager should be reserved for redundant
+        assertThat(result.getTaskManagersToRelease()).isEmpty();
+        // add two more pending task manager for redundant since total 
available resource equals
+        // 12(2+5+5)
+        assertThat(result.getPendingTaskManagersToAllocate()).hasSize(2);
+    }
+
     @Test
     void testRedundantResourceShouldBeReserved() {
         final TaskManagerInfo taskManagerInUse =
@@ -474,8 +514,8 @@ class DefaultResourceAllocationStrategyTest {
                         .build();
 
         DefaultResourceAllocationStrategy strategy = createStrategy(1);
-        ResourceReleaseResult result =
-                
strategy.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+        ResourceReconcileResult result =
+                
strategy.tryReconcileClusterResources(taskManagerResourceInfoProvider);
         // pending task manager should release at first
         assertThat(result.getPendingTaskManagersToRelease())
                 .containsExactly(pendingTaskManagerIdle);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
index 096425a34b4..3fbf1e51db2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
@@ -35,7 +35,7 @@ public class TestingResourceAllocationStrategy implements 
ResourceAllocationStra
                     ResourceAllocationResult>
             tryFulfillRequirementsFunction;
 
-    private final Function<TaskManagerResourceInfoProvider, 
ResourceReleaseResult>
+    private final Function<TaskManagerResourceInfoProvider, 
ResourceReconcileResult>
             tryReleaseUnusedResourcesFunction;
 
     private TestingResourceAllocationStrategy(
@@ -44,7 +44,7 @@ public class TestingResourceAllocationStrategy implements 
ResourceAllocationStra
                             TaskManagerResourceInfoProvider,
                             ResourceAllocationResult>
                     tryFulfillRequirementsFunction,
-            Function<TaskManagerResourceInfoProvider, ResourceReleaseResult>
+            Function<TaskManagerResourceInfoProvider, ResourceReconcileResult>
                     tryReleaseUnusedResourcesFunction) {
         this.tryFulfillRequirementsFunction =
                 Preconditions.checkNotNull(tryFulfillRequirementsFunction);
@@ -62,7 +62,7 @@ public class TestingResourceAllocationStrategy implements 
ResourceAllocationStra
     }
 
     @Override
-    public ResourceReleaseResult tryReleaseUnusedResources(
+    public ResourceReconcileResult tryReconcileClusterResources(
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
         return 
tryReleaseUnusedResourcesFunction.apply(taskManagerResourceInfoProvider);
     }
@@ -79,9 +79,9 @@ public class TestingResourceAllocationStrategy implements 
ResourceAllocationStra
                 tryFulfillRequirementsFunction =
                         (ignored0, ignored1) -> 
ResourceAllocationResult.builder().build();
 
-        private Function<TaskManagerResourceInfoProvider, 
ResourceReleaseResult>
+        private Function<TaskManagerResourceInfoProvider, 
ResourceReconcileResult>
                 tryReleaseUnusedResourcesFunction =
-                        ignored -> ResourceReleaseResult.builder().build();
+                        ignored -> ResourceReconcileResult.builder().build();
 
         public Builder setTryFulfillRequirementsFunction(
                 BiFunction<
@@ -94,7 +94,7 @@ public class TestingResourceAllocationStrategy implements 
ResourceAllocationStra
         }
 
         public Builder setTryReleaseUnusedResourcesFunction(
-                Function<TaskManagerResourceInfoProvider, 
ResourceReleaseResult>
+                Function<TaskManagerResourceInfoProvider, 
ResourceReconcileResult>
                         tryReleaseUnusedResourcesFunction) {
             this.tryReleaseUnusedResourcesFunction = 
tryReleaseUnusedResourcesFunction;
             return this;

Reply via email to