RocMarshal commented on code in PR #27189:
URL: https://github.com/apache/flink/pull/27189#discussion_r2499030304


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappings.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This class is designed to handle the pre-matching of resource requests in 
the context of balanced
+ * task scheduling for streaming jobs. During the batch allocation of 
resources, where resource
+ * requests are allocated in a single, non-interleaved operation, it is 
impossible to make immediate
+ * individual adjustments to unmatched resource requests. This may lead to 
situations where not all
+ * resource requests can be successfully fulfilled. For example:
+ *
+ * <pre>
+ * resource requests:
+ *  - resource request-1: ResourceProfile-1(UNKNOWN)
+ *  - resource request-2: ResourceProfile-2(cpu=2 core, memory=2G)
+ *
+ * available slots:
+ *  - slot-a: ResourceProfile-a(cpu=1 core, memory=1G)
+ *  - slot-b: ResourceProfile-b(cpu=2 core, memory=2G)
+ * </pre>
+ *
+ * When the strategy {@link TasksBalancedRequestSlotMatchingStrategy} performs 
resource allocation,
+ * the following matching mapping might occur, preventing all slot requests 
from being successfully
+ * assigned in a consistent manner and thus hindering the scheduling of the 
entire job:
+ *
+ * <pre>
+ * the unexpected mapping case:
+ *   - resource request-1: ResourceProfile-1(UNKNOWN) was matched with slot-b: 
ResourceProfile-b(cpu=2 core, memory=2G)
+ *   - resource request-2: ResourceProfile-2(cpu=2 core, memory=2G) was not 
matched
+ * </pre>
+ *
+ * Therefore, it is crucial to determine how ResourceProfiles should match 
before the batch
+ * allocation of resource requests, aiming to assure the allocation 
successfully at least. An ideal
+ * matching relationship would be:
+ *
+ * <pre>
+ * - ResourceProfile-1(UNKNOWN)               -> ResourceProfile-a(cpu=1 core, 
memory=1G)
+ * - ResourceProfile-2(cpu=2 core, memory=2G) -> ResourceProfile-b(cpu=2 core, 
memory=2G)
+ * </pre>
+ *
+ * This is the motivation for introducing the current class.
+ */
+final class ResourceRequestPreMappings {
+
+    private final boolean allMatchable;
+    // The global variable to keep base mappings result related information, 
which can assure that
+    // the allocation for all requests could be run successfully at least.
+    private final Map<ResourceProfile, Map<ResourceProfile, Integer>>
+            baseRequiredResourcePreMappings;
+    // The global variable to keep the remaining available flexible resources 
besides the
+    // baseRequiredResourcePreMappings.
+    private final Map<ResourceProfile, Integer> remainingFlexibleResources;
+
+    private ResourceRequestPreMappings(
+            boolean allMatchable,
+            final Map<ResourceProfile, Map<ResourceProfile, Integer>>
+                    baseRequiredResourcePreMappings,
+            final Map<ResourceProfile, Integer> remainingFlexibleResources) {
+        this.allMatchable = allMatchable;
+
+        this.baseRequiredResourcePreMappings =
+                
CollectionUtil.newHashMapWithExpectedSize(baseRequiredResourcePreMappings.size());
+        
this.baseRequiredResourcePreMappings.putAll(baseRequiredResourcePreMappings);
+
+        this.remainingFlexibleResources =
+                
CollectionUtil.newHashMapWithExpectedSize(remainingFlexibleResources.size());
+        this.remainingFlexibleResources.putAll(remainingFlexibleResources);
+    }
+
+    static ResourceRequestPreMappings build(
+            Collection<PendingRequest> pendingRequests, Collection<? extends 
PhysicalSlot> slots) {
+        return new ResourceRequestPreMappingsBuilder(pendingRequests, 
slots).build();
+    }
+
+    boolean isAllMatchable() {
+        return allMatchable;
+    }
+
+    boolean hasAvailableProfile(
+            ResourceProfile requiredResourceProfile, ResourceProfile 
acquirableResourceProfile) {
+        // Check for base mappings first
+        Map<ResourceProfile, Integer> basePreMapping =
+                baseRequiredResourcePreMappings.getOrDefault(
+                        requiredResourceProfile, new HashMap<>());
+        Integer remainingCnt = 
basePreMapping.getOrDefault(acquirableResourceProfile, 0);
+
+        if (remainingCnt > 0) {
+            return true;
+        } else {
+            return 
remainingFlexibleResources.getOrDefault(acquirableResourceProfile, 0) > 0;
+        }
+    }
+
+    void decrease(
+            ResourceProfile requiredResourceProfile, ResourceProfile 
acquiredResourceProfile) {
+        Map<ResourceProfile, Integer> basePreMapping =
+                baseRequiredResourcePreMappings.getOrDefault(
+                        requiredResourceProfile, new HashMap<>());
+        Integer remainingCntOfBaseMappings =
+                basePreMapping.getOrDefault(acquiredResourceProfile, 0);
+        Integer remainingCntOfFlexibleResources =
+                
remainingFlexibleResources.getOrDefault(acquiredResourceProfile, 0);
+
+        Preconditions.checkState(
+                remainingCntOfBaseMappings > 0 || 
remainingCntOfFlexibleResources > 0,
+                "Remaining acquired resource profile %s to match %s is not 
enough.",
+                acquiredResourceProfile,
+                requiredResourceProfile);
+
+        if (remainingCntOfBaseMappings > 0) {
+            basePreMapping.put(acquiredResourceProfile, 
remainingCntOfBaseMappings - 1);
+            return;
+        }
+
+        if (remainingCntOfFlexibleResources > 0) {
+            remainingFlexibleResources.put(
+                    acquiredResourceProfile, remainingCntOfFlexibleResources - 
1);
+            // release a resource back to remainingFlexibleResources.
+            adjustBaseToRemainingFlexibleResources(basePreMapping);
+        }
+    }
+
+    private void adjustBaseToRemainingFlexibleResources(
+            Map<ResourceProfile, Integer> basePreMapping) {
+        Optional<Map.Entry<ResourceProfile, Integer>> 
releasableOptOfBaseMappings =
+                basePreMapping.entrySet().stream()
+                        .filter(entry -> entry.getValue() > 0)
+                        .findFirst();
+        Preconditions.checkState(
+                releasableOptOfBaseMappings.isPresent(),
+                "No releasable mapping found in the base mappings between 
resources and requests.");
+        Map.Entry<ResourceProfile, Integer> releasable = 
releasableOptOfBaseMappings.get();
+        ResourceProfile releasableResourceProfile = releasable.getKey();
+
+        basePreMapping.put(releasableResourceProfile, releasable.getValue() - 
1);
+
+        remainingFlexibleResources.compute(
+                releasableResourceProfile,
+                (resourceProfile, oldValue) -> oldValue == null ? 1 : oldValue 
+ 1);
+    }
+
+    @VisibleForTesting
+    static ResourceRequestPreMappings build(
+            boolean allMatchable,
+            final Map<ResourceProfile, Map<ResourceProfile, Integer>>
+                    baseRequiredResourcePreMappings,
+            final Map<ResourceProfile, Integer> remainingFlexibleResources) {
+        return new ResourceRequestPreMappings(
+                allMatchable, baseRequiredResourcePreMappings, 
remainingFlexibleResources);
+    }
+
+    @VisibleForTesting
+    Map<ResourceProfile, Map<ResourceProfile, Integer>> 
getBaseRequiredResourcePreMappings() {
+        return Collections.unmodifiableMap(baseRequiredResourcePreMappings);
+    }
+
+    @VisibleForTesting
+    int getAvailableResourceCntOfBasePreMappings(
+            ResourceProfile requiredResourceProfile, ResourceProfile 
acquirableResourceProfile) {
+        return baseRequiredResourcePreMappings
+                .getOrDefault(requiredResourceProfile, new HashMap<>())
+                .getOrDefault(acquirableResourceProfile, 0);
+    }
+
+    @VisibleForTesting
+    Map<ResourceProfile, Integer> getRemainingFlexibleResources() {
+        return Collections.unmodifiableMap(remainingFlexibleResources);
+    }
+
+    @VisibleForTesting
+    int getAvailableResourceCntOfRemainingFlexibleMapping(
+            ResourceProfile availableResourceProfile) {
+        return 
remainingFlexibleResources.getOrDefault(availableResourceProfile, 0);
+    }
+
+    private static final class ResourceRequestPreMappingsBuilder {

Review Comment:
   Thanks.
   
   @davidradl If there're no more comments , I'll push forward the next step of 
review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to