KarmaGYZ commented on code in PR #22913:
URL: https://github.com/apache/flink/pull/22913#discussion_r1268872219


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Track all free slots, support bookkeeping slot for {@link 
SlotSelectionStrategy}. */
+public class FreeSlotInfoTracker {

Review Comment:
   We can create an interface `FreeSlotInfoTracker` and implement the 
`DefaultFreeSlotInfoTracker` in this PR. It will ease the future extension and 
testing.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Track all free slots, support bookkeeping slot for {@link 
SlotSelectionStrategy}. */
+public class FreeSlotInfoTracker {
+    private final Set<AllocationID> freeSlots;
+    private final Function<AllocationID, SlotInfo> slotInfoLookup;
+    private final Function<AllocationID, AllocatedSlotPool.FreeSlotInfo> 
freeSlotInfoLookup;
+    private final Function<ResourceID, Double> taskExecutorUtilizationLookup;
+
+    public FreeSlotInfoTracker(
+            Set<AllocationID> freeSlots,
+            Function<AllocationID, SlotInfo> slotInfoLookup,
+            Function<AllocationID, AllocatedSlotPool.FreeSlotInfo> 
freeSlotInfoLookup,
+            Function<ResourceID, Double> taskExecutorUtilizationLookup) {
+        this.freeSlots = new HashSet<>(freeSlots);
+        this.slotInfoLookup = slotInfoLookup;
+        this.freeSlotInfoLookup = freeSlotInfoLookup;
+        this.taskExecutorUtilizationLookup = taskExecutorUtilizationLookup;
+    }
+
+    /**
+     * Get allocation id of all available slots.
+     *
+     * @return allocation id of available slots
+     */
+    public Set<AllocationID> getAvailableSlots() {
+        return Collections.unmodifiableSet(freeSlots);
+    }
+
+    /**
+     * Get slot info by allocation id, this slot must exist.
+     *
+     * @param allocationId to get SlotInfo
+     * @return slot info for the allocation id
+     */
+    public SlotInfo getSlotInfo(AllocationID allocationId) {
+        return Preconditions.checkNotNull(slotInfoLookup.apply(allocationId));
+    }
+
+    /**
+     * Returns a list of {@link AllocatedSlotPool.FreeSlotInfo} objects about 
all slots with slot
+     * idle since that are currently available in the slot pool.
+     *
+     * @return a list of {@link AllocatedSlotPool.FreeSlotInfo} objects about 
all slots with slot *

Review Comment:
   ```suggestion
        * @return a list of {@link AllocatedSlotPool.FreeSlotInfo} objects 
about all slots with slot
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Track all free slots, support bookkeeping slot for {@link 
SlotSelectionStrategy}. */
+public class FreeSlotInfoTracker {

Review Comment:
   Then, we can add a `TestingFreeSlotInfoTracker` along with a Builder to 
replace the `FreeSlotInfoTrackerTestUtils` .



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java:
##########
@@ -225,35 +236,63 @@ private void cancelLogicalSlotRequest(ExecutionVertexID 
executionVertexId, Throw
         return assignments;
     }
 
-    private SharedSlot getOrAllocateSharedSlot(
-            ExecutionSlotSharingGroup executionSlotSharingGroup,
+    private Map<ExecutionSlotSharingGroup, SharedSlot> 
tryAssignExistingSharedSlots(
+            Set<ExecutionSlotSharingGroup> executionSlotSharingGroups) {
+        Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots =
+                new HashMap<>(executionSlotSharingGroups.size());
+        for (ExecutionSlotSharingGroup group : executionSlotSharingGroups) {
+            SharedSlot sharedSlot = sharedSlots.get(group);
+            if (sharedSlot != null) {
+                assignedSlots.put(group, sharedSlot);
+            }
+        }
+        return assignedSlots;
+    }
+
+    private Map<ExecutionSlotSharingGroup, SharedSlot> allocateSharedSlots(
+            Set<ExecutionSlotSharingGroup> executionSlotSharingGroups,
             SharedSlotProfileRetriever sharedSlotProfileRetriever) {
-        return sharedSlots.computeIfAbsent(
-                executionSlotSharingGroup,
-                group -> {
-                    SlotRequestId physicalSlotRequestId = new SlotRequestId();
-                    ResourceProfile physicalSlotResourceProfile =
-                            getPhysicalSlotResourceProfile(group);
-                    SlotProfile slotProfile =
-                            sharedSlotProfileRetriever.getSlotProfile(
-                                    group, physicalSlotResourceProfile);
-                    PhysicalSlotRequest physicalSlotRequest =
-                            new PhysicalSlotRequest(
-                                    physicalSlotRequestId,
-                                    slotProfile,
-                                    slotWillBeOccupiedIndefinitely);
+
+        Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots = new 
HashMap<>();
+        Map<SlotRequestId, PhysicalSlotRequest> slotRequests = new HashMap<>();

Review Comment:
   Why we define it as a map? Seems the keyset will never be used.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java:
##########
@@ -67,43 +69,71 @@ public class SimpleExecutionSlotAllocator implements 
ExecutionSlotAllocator {
     @Override
     public List<ExecutionSlotAssignment> allocateSlotsFor(
             List<ExecutionAttemptID> executionAttemptIds) {
-        return executionAttemptIds.stream()
-                .map(id -> new ExecutionSlotAssignment(id, 
allocateSlotFor(id)))
-                .collect(Collectors.toList());
+        List<ExecutionSlotAssignment> result = new 
ArrayList<>(executionAttemptIds.size());
+
+        Map<SlotRequestId, ExecutionAttemptID> 
remainingExecutionsToSlotRequest =
+                new HashMap<>(executionAttemptIds.size());
+        List<PhysicalSlotRequest> physicalSlotRequests =
+                new ArrayList<>(executionAttemptIds.size());
+
+        for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
+            if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) {
+                result.add(
+                        new ExecutionSlotAssignment(
+                                executionAttemptId,
+                                
requestedPhysicalSlots.getValueByKeyA(executionAttemptId)));
+            } else {
+                final SlotRequestId slotRequestId = new SlotRequestId();
+                final ResourceProfile resourceProfile =
+                        resourceProfileRetriever.apply(executionAttemptId);
+                final SlotProfile slotProfile =
+                        SlotProfile.priorAllocation(
+                                resourceProfile,
+                                resourceProfile,
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptySet());
+                final PhysicalSlotRequest request =
+                        new PhysicalSlotRequest(
+                                slotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
+                physicalSlotRequests.add(request);
+                remainingExecutionsToSlotRequest.put(slotRequestId, 
executionAttemptId);
+            }
+        }
+
+        allocatePhysicalSlotsFor(remainingExecutionsToSlotRequest, 
physicalSlotRequests, result);
+
+        return result;
     }
 
-    private CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID 
executionAttemptId) {
-        if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) {
-            return requestedPhysicalSlots.getValueByKeyA(executionAttemptId);
-        }
-        final SlotRequestId slotRequestId = new SlotRequestId();
-        final ResourceProfile resourceProfile = 
resourceProfileRetriever.apply(executionAttemptId);
-        final SlotProfile slotProfile =
-                SlotProfile.priorAllocation(
-                        resourceProfile,
-                        resourceProfile,
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Collections.emptySet());
-        final PhysicalSlotRequest request =
-                new PhysicalSlotRequest(slotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
-        final CompletableFuture<LogicalSlot> slotFuture =
-                slotProvider
-                        .allocatePhysicalSlot(request)
-                        .thenApply(
-                                physicalSlotRequest ->
-                                        allocateLogicalSlotFromPhysicalSlot(
-                                                slotRequestId,
-                                                
physicalSlotRequest.getPhysicalSlot(),
-                                                
slotWillBeOccupiedIndefinitely));
-        slotFuture.exceptionally(
-                throwable -> {
-                    this.requestedPhysicalSlots.removeKeyA(executionAttemptId);
-                    this.slotProvider.cancelSlotRequest(slotRequestId, 
throwable);
-                    return null;
+    private void allocatePhysicalSlotsFor(

Review Comment:
   ```suggestion
       private List<ExecutionSlotAssignment> allocatePhysicalSlotsFor(
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java:
##########
@@ -67,43 +69,71 @@ public class SimpleExecutionSlotAllocator implements 
ExecutionSlotAllocator {
     @Override
     public List<ExecutionSlotAssignment> allocateSlotsFor(
             List<ExecutionAttemptID> executionAttemptIds) {
-        return executionAttemptIds.stream()
-                .map(id -> new ExecutionSlotAssignment(id, 
allocateSlotFor(id)))
-                .collect(Collectors.toList());
+        List<ExecutionSlotAssignment> result = new 
ArrayList<>(executionAttemptIds.size());
+
+        Map<SlotRequestId, ExecutionAttemptID> 
remainingExecutionsToSlotRequest =
+                new HashMap<>(executionAttemptIds.size());
+        List<PhysicalSlotRequest> physicalSlotRequests =
+                new ArrayList<>(executionAttemptIds.size());
+
+        for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
+            if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) {
+                result.add(
+                        new ExecutionSlotAssignment(
+                                executionAttemptId,
+                                
requestedPhysicalSlots.getValueByKeyA(executionAttemptId)));
+            } else {
+                final SlotRequestId slotRequestId = new SlotRequestId();
+                final ResourceProfile resourceProfile =
+                        resourceProfileRetriever.apply(executionAttemptId);
+                final SlotProfile slotProfile =
+                        SlotProfile.priorAllocation(
+                                resourceProfile,
+                                resourceProfile,
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptySet());
+                final PhysicalSlotRequest request =
+                        new PhysicalSlotRequest(
+                                slotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
+                physicalSlotRequests.add(request);
+                remainingExecutionsToSlotRequest.put(slotRequestId, 
executionAttemptId);
+            }
+        }
+
+        allocatePhysicalSlotsFor(remainingExecutionsToSlotRequest, 
physicalSlotRequests, result);
+
+        return result;
     }
 
-    private CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID 
executionAttemptId) {
-        if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) {
-            return requestedPhysicalSlots.getValueByKeyA(executionAttemptId);
-        }
-        final SlotRequestId slotRequestId = new SlotRequestId();
-        final ResourceProfile resourceProfile = 
resourceProfileRetriever.apply(executionAttemptId);
-        final SlotProfile slotProfile =
-                SlotProfile.priorAllocation(
-                        resourceProfile,
-                        resourceProfile,
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Collections.emptySet());
-        final PhysicalSlotRequest request =
-                new PhysicalSlotRequest(slotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
-        final CompletableFuture<LogicalSlot> slotFuture =
-                slotProvider
-                        .allocatePhysicalSlot(request)
-                        .thenApply(
-                                physicalSlotRequest ->
-                                        allocateLogicalSlotFromPhysicalSlot(
-                                                slotRequestId,
-                                                
physicalSlotRequest.getPhysicalSlot(),
-                                                
slotWillBeOccupiedIndefinitely));
-        slotFuture.exceptionally(
-                throwable -> {
-                    this.requestedPhysicalSlots.removeKeyA(executionAttemptId);
-                    this.slotProvider.cancelSlotRequest(slotRequestId, 
throwable);
-                    return null;
+    private void allocatePhysicalSlotsFor(

Review Comment:
   The caller can append the returned list to its result list.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Track all free slots, support bookkeeping slot for {@link 
SlotSelectionStrategy}. */
+public class FreeSlotInfoTracker {

Review Comment:
   `createTestingFreeSlotInfoTracker ` can be remained as utility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to