dmvk commented on a change in pull request #18286: URL: https://github.com/apache/flink/pull/18286#discussion_r781301276
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import java.util.Collection; + +/** Strategy to match slot requests to slots. */ +public interface RequestSlotMatchingStrategy { Review comment: ```suggestion interface RequestSlotMatchingStrategy { ``` Should this interface be package private as `PendingRequest` and methods of the `RequestSlotMatching` class are as well? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ########## @@ -177,9 +199,18 @@ void start( * requested batch slot * @return a future which is completed with newly allocated batch slot */ + @Nonnull + default CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot( + @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) { Review comment: should we get rid of `@Nonnull` annotations here? We should be implicitly treating everything as non-null and only annotate `@Nullable` parameters to reduce noise. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java ########## @@ -45,15 +50,29 @@ import static org.junit.Assert.assertThat; /** Tests for the {@link DeclarativeSlotPoolBridge}. */ +@RunWith(Parameterized.class) public class DeclarativeSlotPoolBridgeResourceDeclarationTest extends TestLogger { private static final JobMasterId jobMasterId = JobMasterId.generate(); private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); + private final RequestSlotMatchingStrategy requestSlotMatchingStrategy; private RequirementListener requirementListener; private DeclarativeSlotPoolBridge declarativeSlotPoolBridge; + @Parameterized.Parameters(name = "RequestSlotMatchingStrategy: {0}") + public static Collection<RequestSlotMatchingStrategy> data() throws IOException { Review comment: 👍 ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java ########## @@ -55,13 +59,26 @@ import static org.junit.Assert.fail; /** Tests for the {@link DeclarativeSlotPoolBridge}. */ +@RunWith(Parameterized.class) public class DeclarativeSlotPoolBridgeTest extends TestLogger { private static final Time rpcTimeout = Time.seconds(20); private static final JobID jobId = new JobID(); private static final JobMasterId jobMasterId = JobMasterId.generate(); private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); + private final RequestSlotMatchingStrategy requestSlotMatchingStrategy; + + @Parameterized.Parameters(name = "RequestSlotMatchingStrategy: {0}") + public static Collection<RequestSlotMatchingStrategy> data() throws IOException { + return Arrays.asList( + SimpleRequestSlotMatchingStrategy.INSTANCE, + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE); + } + + public DeclarativeSlotPoolBridgeTest(RequestSlotMatchingStrategy requestSlotMatchingStrategy) { + this.requestSlotMatchingStrategy = requestSlotMatchingStrategy; Review comment: This tests that the different slot matching strategy doesn't break anything. Should we also test that it's really taken into account? (same with the `DeclarativeSlotPoolBridgeResourceDeclarationTest.java`) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.jobmaster.SlotRequestId; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; + +/** + * Simple implementation of the {@link RequestSlotMatchingStrategy} that matches the pending + * requests in order as long as the resource profile can be fulfilled. + */ +public enum SimpleRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy { + INSTANCE; + + @Override + public Collection<RequestSlotMatching> matchRequestsAndSlots( + Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) { + final Collection<RequestSlotMatching> resultingMatchings = new ArrayList<>(); + + // if pendingRequests has a special order, then let's preserve it + final LinkedHashMap<SlotRequestId, PendingRequest> pendingRequestsIndex = Review comment: Why do we need an index here? Are we expecting duplicates? Can we simply replace this with a LinkedList? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.jobmaster.SlotRequestId; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; + +/** + * Simple implementation of the {@link RequestSlotMatchingStrategy} that matches the pending + * requests in order as long as the resource profile can be fulfilled. + */ +public enum SimpleRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy { + INSTANCE; + + @Override + public Collection<RequestSlotMatching> matchRequestsAndSlots( + Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) { + final Collection<RequestSlotMatching> resultingMatchings = new ArrayList<>(); Review comment: nit: ```suggestion final Collection<RequestSlotMatching> resultingMatches = new ArrayList<>(); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import java.util.Collection; + +/** Strategy to match slot requests to slots. */ +public interface RequestSlotMatchingStrategy { + + /** + * Match the given slots with the given collection of pending requests. + * + * @param slots slots to match + * @param pendingRequests slot requests to match + * @return resulting matchings of this operation + */ + Collection<RequestSlotMatching> matchRequestsAndSlots( + Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests); + + /** Result class representing matchings. */ + final class RequestSlotMatching { Review comment: ```suggestion final class RequestSlotMatch { ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ########## @@ -162,9 +163,30 @@ void start( * @return a newly allocated slot that was previously not available. */ @Nonnull + default CompletableFuture<PhysicalSlot> requestNewAllocatedSlot( + @Nonnull SlotRequestId slotRequestId, + @Nonnull ResourceProfile resourceProfile, + @Nullable Time timeout) { + return requestNewAllocatedSlot( + slotRequestId, resourceProfile, Collections.emptyList(), timeout); + } + + /** + * Request the allocation of a new slot from the resource manager. This method will not return a + * slot from the already available slots from the pool, but instead will add a new slot to that + * pool that is immediately allocated and returned. + * + * @param slotRequestId identifying the requested slot + * @param resourceProfile resource profile that specifies the resource requirements for the + * requested slot + * @param preferredAllocations preferred allocations for the new allocated slot + * @param timeout timeout for the allocation procedure + * @return a newly allocated slot that was previously not available. + */ CompletableFuture<PhysicalSlot> requestNewAllocatedSlot( @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, + @Nonnull Collection<AllocationID> preferredAllocations, Review comment: I'm wondering whether we should also make use of preferred locations as they can potentially address the data locality issue as well 🤔 ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmaster.SlotRequestId; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +final class PendingRequest { + + private final SlotRequestId slotRequestId; + + private final ResourceProfile resourceProfile; + + private final HashSet<AllocationID> preferredAllocations; + + private final CompletableFuture<PhysicalSlot> slotFuture; + + private final boolean isBatchRequest; + + private long unfulfillableSince; + + private PendingRequest( + SlotRequestId slotRequestId, + ResourceProfile resourceProfile, + Collection<AllocationID> preferredAllocations, + boolean isBatchRequest) { + this.slotRequestId = slotRequestId; + this.resourceProfile = resourceProfile; + this.preferredAllocations = new HashSet<>(preferredAllocations); + this.isBatchRequest = isBatchRequest; + this.slotFuture = new CompletableFuture<>(); + this.unfulfillableSince = Long.MAX_VALUE; + } + + static PendingRequest createBatchRequest( + SlotRequestId slotRequestId, + ResourceProfile resourceProfile, + Collection<AllocationID> preferredAllocations) { + return new PendingRequest(slotRequestId, resourceProfile, preferredAllocations, true); + } + + static PendingRequest createNormalRequest( + SlotRequestId slotRequestId, + ResourceProfile resourceProfile, + Collection<AllocationID> preferredAllocations) { + return new PendingRequest(slotRequestId, resourceProfile, preferredAllocations, false); + } + + SlotRequestId getSlotRequestId() { + return slotRequestId; + } + + ResourceProfile getResourceProfile() { + return resourceProfile; + } + + public Set<AllocationID> getPreferredAllocations() { + return preferredAllocations; + } + + CompletableFuture<PhysicalSlot> getSlotFuture() { + return slotFuture; + } + + void failRequest(Exception cause) { + slotFuture.completeExceptionally(cause); + } + + public boolean isBatchRequest() { + return isBatchRequest; + } + + public void markFulfillable() { Review comment: should we have all "public" methods either public or package private (same as the class)? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java ########## @@ -102,12 +103,16 @@ public SlotProfile getSlotProfile( preferredLocationsRetriever.getPreferredLocations( execution, producersToIgnore)); } - return SlotProfile.priorAllocation( - physicalSlotResourceProfile, - physicalSlotResourceProfile, - preferredLocations, - priorAllocations, - reservedAllocationIds); + + final SlotProfile slotProfile = Review comment: Why do we need this change? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobmaster.SlotRequestId; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * {@link RequestSlotMatchingStrategy} that takes the preferred allocations into account. The + * strategy will try to fulfill the preferred allocations and if this is not possible, then it will + * fall back to {@link SimpleRequestSlotMatchingStrategy}. + */ +public enum PreferredAllocationRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy { + INSTANCE; + + @Override + public Collection<RequestSlotMatching> matchRequestsAndSlots( + Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) { + final Collection<RequestSlotMatching> requestSlotMatchings = new ArrayList<>(); + + final Map<AllocationID, PhysicalSlot> freeSlots = + slots.stream() + .collect( + Collectors.toMap( + PhysicalSlot::getAllocationId, Function.identity())); + + final Map<SlotRequestId, PendingRequest> pendingRequestsWithPreferredAllocations = + new HashMap<>(); + final List<PendingRequest> unmatchedRequests = new ArrayList<>(); + + // Split requests into those that have preferred allocations and those that don't have + for (PendingRequest pendingRequest : pendingRequests) { + if (pendingRequest.getPreferredAllocations().isEmpty()) { + unmatchedRequests.add(pendingRequest); + } else { + pendingRequestsWithPreferredAllocations.put( + pendingRequest.getSlotRequestId(), pendingRequest); + } + } + + Iterator<PhysicalSlot> freeSlotsIterator = freeSlots.values().iterator(); + // Match slots and pending requests based on preferred allocation + while (freeSlotsIterator.hasNext() && !pendingRequestsWithPreferredAllocations.isEmpty()) { + final PhysicalSlot freeSlot = freeSlotsIterator.next(); + + final Iterator<PendingRequest> pendingRequestIterator = + pendingRequestsWithPreferredAllocations.values().iterator(); + + while (pendingRequestIterator.hasNext()) { + final PendingRequest pendingRequest = pendingRequestIterator.next(); + + if (freeSlot.getResourceProfile().isMatching(pendingRequest.getResourceProfile()) + && pendingRequest + .getPreferredAllocations() + .contains(freeSlot.getAllocationId())) { + requestSlotMatchings.add( + RequestSlotMatching.createFor(pendingRequest, freeSlot)); + pendingRequestIterator.remove(); + freeSlotsIterator.remove(); + break; + } + } + } + + unmatchedRequests.addAll(pendingRequestsWithPreferredAllocations.values()); + if (!freeSlots.isEmpty() && !unmatchedRequests.isEmpty()) { + requestSlotMatchings.addAll( + SimpleRequestSlotMatchingStrategy.INSTANCE.matchRequestsAndSlots( Review comment: 👍 ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobmaster.SlotRequestId; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * {@link RequestSlotMatchingStrategy} that takes the preferred allocations into account. The + * strategy will try to fulfill the preferred allocations and if this is not possible, then it will + * fall back to {@link SimpleRequestSlotMatchingStrategy}. + */ +public enum PreferredAllocationRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy { + INSTANCE; + + @Override + public Collection<RequestSlotMatching> matchRequestsAndSlots( + Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) { + final Collection<RequestSlotMatching> requestSlotMatchings = new ArrayList<>(); + + final Map<AllocationID, PhysicalSlot> freeSlots = + slots.stream() + .collect( + Collectors.toMap( + PhysicalSlot::getAllocationId, Function.identity())); + + final Map<SlotRequestId, PendingRequest> pendingRequestsWithPreferredAllocations = + new HashMap<>(); + final List<PendingRequest> unmatchedRequests = new ArrayList<>(); + + // Split requests into those that have preferred allocations and those that don't have + for (PendingRequest pendingRequest : pendingRequests) { + if (pendingRequest.getPreferredAllocations().isEmpty()) { + unmatchedRequests.add(pendingRequest); + } else { + pendingRequestsWithPreferredAllocations.put( + pendingRequest.getSlotRequestId(), pendingRequest); + } + } + + Iterator<PhysicalSlot> freeSlotsIterator = freeSlots.values().iterator(); Review comment: nit ```suggestion final Iterator<PhysicalSlot> freeSlotsIterator = freeSlots.values().iterator(); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java ########## @@ -184,6 +189,25 @@ public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration( slotPoolServiceFactory, schedulerNGFactory); } + private static RequestSlotMatchingStrategy getRequestSlotMatchingStrategy( Review comment: Should we have a test for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
