tillrohrmann commented on a change in pull request #13181: URL: https://github.com/apache/flink/pull/13181#discussion_r483678060
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.util.clock.Clock; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Default implementation of {@link PhysicalSlotRequestBulkChecker}. + */ +public class PhysicalSlotRequestBulkCheckerImpl implements PhysicalSlotRequestBulkChecker { + + private ComponentMainThreadExecutor componentMainThreadExecutor; Review comment: I'd initialize this field with a `DummyComponentMainThreadExecutor` to ensure that `start` is called before any other action is called on it. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java ########## @@ -38,6 +38,26 @@ import java.util.function.Consumer; import java.util.function.Function; +/** + * Shared slot implementation for the {@link SlotSharingExecutionSlotAllocator}. + * + * <p>The shared slots are owned and tracked by {@link SlotSharingExecutionSlotAllocator}. + * The shared slot represents a collection of {@link SingleLogicalSlot} requests which share one physical slot. + * The shared slot is created by the {@link SlotSharingExecutionSlotAllocator} from the physical slot request. + * Afterwards, {@link SlotSharingExecutionSlotAllocator} requests logical slots from the underlying physical slot + * for execution which share it. Review comment: ```suggestion * for {@link Execution executions} which share it. ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.util.clock.Clock; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +class PhysicalSlotRequestBulkCheckerImpl implements PhysicalSlotRequestBulkChecker { + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private final Supplier<Set<SlotInfo>> slotsRetriever; + + private final Clock clock; + + PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>> slotsRetriever, final Clock clock) { + this.slotsRetriever = checkNotNull(slotsRetriever); + this.clock = checkNotNull(clock); + } + + void start(final ComponentMainThreadExecutor mainThreadExecutor) { + this.componentMainThreadExecutor = mainThreadExecutor; + } + + @Override + public void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time timeout) { + PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = new PhysicalSlotRequestBulkWithTimestamp(bulk); + bulkWithTimestamp.markUnfulfillable(clock.relativeTimeMillis()); + schedulePendingRequestBulkTimeoutCheck(bulkWithTimestamp, timeout); + } + + private void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp bulk, Time timeout) { + componentMainThreadExecutor.schedule(() -> { + TimeoutCheckResult result = checkPhysicalSlotRequestBulkTimeout(bulk, timeout); + + switch (result) { + case PENDING: + //re-schedule the timeout check + schedulePendingRequestBulkTimeoutCheck(bulk, timeout); + break; + case TIMEOUT: + bulk.cancel(new TimeoutException("Slot request bulk is not fulfillable!")); + break; + case FULFILLED: + default: + // no action to take + break; + } + }, timeout.getSize(), timeout.getUnit()); + } + + /** + * Check the slot request bulk and timeout its requests if it has been unfulfillable for too long. + * @param slotRequestBulk bulk of slot requests + * @param slotRequestTimeout indicates how long a pending request can be unfulfillable + * @return result of the check, indicating the bulk is fulfilled, still pending, or timed out + */ + @VisibleForTesting + TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout( + final PhysicalSlotRequestBulkWithTimestamp slotRequestBulk, + final Time slotRequestTimeout) { + + if (slotRequestBulk.getPendingRequests().isEmpty()) { + return TimeoutCheckResult.FULFILLED; + } + + final boolean fulfillable = isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever); + if (fulfillable) { + slotRequestBulk.markFulfillable(); + } else { + final long currentTimestamp = clock.relativeTimeMillis(); + + slotRequestBulk.markUnfulfillable(currentTimestamp); + + final long unfulfillableSince = slotRequestBulk.getUnfulfillableSince(); + if (unfulfillableSince + slotRequestTimeout.toMilliseconds() <= currentTimestamp) { + return TimeoutCheckResult.TIMEOUT; + } + } + + return TimeoutCheckResult.PENDING; + } + + /** + * Returns whether the given bulk of slot requests are possible to be fulfilled at the same time + * with all the reusable slots in the slot pool. A reusable slot means the slot is available or + * will not be occupied indefinitely. + * + * @param slotRequestBulk bulk of slot requests to check + * @param slotsRetriever supplies slots to be used for the fulfill-ability check + * @return true if the slot requests are possible to be fulfilled, otherwise false + */ + @VisibleForTesting + static boolean isSlotRequestBulkFulfillable( + final PhysicalSlotRequestBulk slotRequestBulk, + final Supplier<Set<SlotInfo>> slotsRetriever) { + + final Set<AllocationID> assignedSlots = slotRequestBulk.getAllocationIdsOfFulfilledRequests(); + final Set<SlotInfo> reusableSlots = getReusableSlots(slotsRetriever, assignedSlots); + return areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests(), reusableSlots); + } + + private static Set<SlotInfo> getReusableSlots( + final Supplier<Set<SlotInfo>> slotsRetriever, + final Set<AllocationID> slotsToExclude) { + + return slotsRetriever.get().stream() + .filter(slotInfo -> !slotInfo.willBeOccupiedIndefinitely()) + .filter(slotInfo -> !slotsToExclude.contains(slotInfo.getAllocationId())) + .collect(Collectors.toSet()); + } + + private static boolean areRequestsFulfillableWithSlots( + final Collection<ResourceProfile> requestResourceProfiles, + final Set<SlotInfo> slots) { + + final Set<SlotInfo> remainingSlots = new HashSet<>(slots); + for (ResourceProfile requestResourceProfile : requestResourceProfiles) { + final Optional<SlotInfo> matchedSlot = findMatchingSlotForRequest(requestResourceProfile, remainingSlots); + if (matchedSlot.isPresent()) { + remainingSlots.remove(matchedSlot.get()); + } else { + return false; + } + } Review comment: Maybe add a comment stating the complexity. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.DummyPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.scheduler.SharedSlotTestingUtils.TestingPhysicalSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; + +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId; +import static org.apache.flink.runtime.scheduler.SharedSlotTestingUtils.createExecutionSlotSharingGroup; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link SharedSlot}. + */ +public class SharedSlotTest { Review comment: ```suggestion public class SharedSlotTest extends TestLogger { ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java ########## @@ -138,27 +133,38 @@ @Override public void cancel(ExecutionVertexID executionVertexId) { + cancelLogicalSlotRequest(executionVertexId, null); + } + + private void cancelLogicalSlotRequest(ExecutionVertexID executionVertexId, Throwable cause) { ExecutionSlotSharingGroup executionSlotSharingGroup = slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexId); checkNotNull( executionSlotSharingGroup, "There is no ExecutionSlotSharingGroup for ExecutionVertexID " + executionVertexId); SharedSlot slot = sharedSlots.get(executionSlotSharingGroup); if (slot != null) { - slot.cancelLogicalSlotRequest(executionVertexId, null); + slot.cancelLogicalSlotRequest(executionVertexId, cause); } else { - LOG.debug("There is no slot for ExecutionSlotSharingGroup of ExecutionVertexID {}", executionVertexId); + LOG.debug("There is no SharedSlot for ExecutionSlotSharingGroup of ExecutionVertexID {}", executionVertexId); } } - private Stream<SlotExecutionVertexAssignment> allocateLogicalSlotsFromSharedSlot( + private Map<ExecutionVertexID, SlotExecutionVertexAssignment> allocateLogicalSlotsFromSharedSlots( SharedSlotProfileRetriever sharedSlotProfileRetriever, - ExecutionSlotSharingGroup executionSlotSharingGroup, - Collection<ExecutionVertexID> executions) { - SharedSlot sharedSlot = getOrAllocateSharedSlot(executionSlotSharingGroup, sharedSlotProfileRetriever); - return executions - .stream() - .map(execution -> new SlotExecutionVertexAssignment(execution, sharedSlot.allocateLogicalSlot(execution))); + Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup) { + Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments = new HashMap<>(); + for (Map.Entry<ExecutionSlotSharingGroup, List<ExecutionVertexID>> entry : executionsByGroup.entrySet()) { + ExecutionSlotSharingGroup group = entry.getKey(); + List<ExecutionVertexID> executionIds = entry.getValue(); + SharedSlot sharedSlot = getOrAllocateSharedSlot(group, sharedSlotProfileRetriever); + for (ExecutionVertexID executionId : executionIds) { + CompletableFuture<LogicalSlot> logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId); + SlotExecutionVertexAssignment assignment = new SlotExecutionVertexAssignment(executionId, logicalSlotFuture); + assignments.put(executionId, assignment); + } Review comment: nit: Some empty lines could make it a bit more readable. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulkTest.java ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId; +import static org.apache.flink.runtime.scheduler.SharedSlotTestingUtils.createExecutionSlotSharingGroup; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertThat; + +/** + * Test suite for {@link SharingPhysicalSlotRequestBulk}. + */ +public class SharingPhysicalSlotRequestBulkTest { Review comment: ```suggestion public class SharingPhysicalSlotRequestBulkTest extends TestLogger { ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java ########## @@ -102,20 +91,6 @@ default void start(ComponentMainThreadExecutor mainThreadExecutor) { allocationTimeout); } - /** - * Allocates a bulk of physical slots. The allocation will be completed - * normally only when all the requests are fulfilled. - * - * @param physicalSlotRequests requests for physical slots - * @param timeout indicating how long it is accepted that the slot requests can be unfulfillable - * @return future of the results of slot requests - */ - default CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots( - Collection<PhysicalSlotRequest> physicalSlotRequests, - Time timeout) { - throw new UnsupportedOperationException("Not properly implemented."); - } - /** * Cancels the slot request with the given {@link SlotRequestId} and {@link SlotSharingGroupId}. * Review comment: Can default `void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause)` be removed? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java ########## @@ -38,6 +38,26 @@ import java.util.function.Consumer; import java.util.function.Function; +/** + * Shared slot implementation for the {@link SlotSharingExecutionSlotAllocator}. + * + * <p>The shared slots are owned and tracked by {@link SlotSharingExecutionSlotAllocator}. + * The shared slot represents a collection of {@link SingleLogicalSlot} requests which share one physical slot. + * The shared slot is created by the {@link SlotSharingExecutionSlotAllocator} from the physical slot request. + * Afterwards, {@link SlotSharingExecutionSlotAllocator} requests logical slots from the underlying physical slot + * for execution which share it. + * + * <p>The shared slot becomes a {@link PhysicalSlot.Payload} of its underlying physical slot + * once the physical slot is obtained. If the allcoated physical slot gets released then it calls back the shared slot + * to release the logical slots which fail their execution payloads. + * + * <p>A logical slot request can be cancelled if it is not completed yet or returned by the execution + * if it has been completed and given to the execution by {@link SlotSharingExecutionSlotAllocator}. + * If the underlying physical slot fails, it fails all logical slot requests. + * The failed, cancelled or returned logical slot requests are removed from the shared slot. + * Once the shared slot has no registered logical slot requests, it calls back its {@link SlotSharingExecutionSlotAllocator} + * to remove it from the allocator and cancel its underlying physical slot request if the request is not fulfilled yet. Review comment: Nice :-) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
