This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a1f7a5c1cf480826e910e6c7ed7645d89d479c30 Author: Zhu Zhu <[email protected]> AuthorDate: Fri Mar 12 11:57:40 2021 +0800 [hotfix][runtime] Refactor testing methods out from SlotProfile into SlotProfileTestingUtils --- .../clusterframework/types/SlotProfile.java | 38 -------------- ...ocationPreferenceSlotSelectionStrategyTest.java | 15 +++--- .../types/SlotProfileTestingUtils.java | 61 ++++++++++++++++++++++ ...lSlotProviderImplWithSpreadOutStrategyTest.java | 4 +- .../slotpool/PhysicalSlotProviderResource.java | 6 ++- .../SlotSharingExecutionSlotAllocatorTest.java | 3 +- 6 files changed, 78 insertions(+), 49 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java index 890ac89..d818c67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java @@ -18,12 +18,10 @@ package org.apache.flink.runtime.clusterframework.types; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobmaster.SlotContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import java.util.Collection; -import java.util.Collections; import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -37,10 +35,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * SlotContext} against the slot profile and, potentially, further requirements. */ public class SlotProfile { - - /** Singleton object for a slot profile without any requirements. */ - private static final SlotProfile NO_REQUIREMENTS = noLocality(ResourceProfile.UNKNOWN); - /** This specifies the desired resource profile for the task slot. */ private final ResourceProfile taskResourceProfile; @@ -99,38 +93,6 @@ public class SlotProfile { return previousExecutionGraphAllocations; } - /** Returns a slot profile that has no requirements. */ - @VisibleForTesting - public static SlotProfile noRequirements() { - return NO_REQUIREMENTS; - } - - /** Returns a slot profile for the given resource profile, without any locality requirements. */ - @VisibleForTesting - public static SlotProfile noLocality(ResourceProfile resourceProfile) { - return preferredLocality(resourceProfile, Collections.emptyList()); - } - - /** - * Returns a slot profile for the given resource profile and the preferred locations. - * - * @param resourceProfile specifying the slot requirements - * @param preferredLocations specifying the preferred locations - * @return Slot profile with the given resource profile and preferred locations - */ - @VisibleForTesting - public static SlotProfile preferredLocality( - final ResourceProfile resourceProfile, - final Collection<TaskManagerLocation> preferredLocations) { - - return priorAllocation( - resourceProfile, - resourceProfile, - preferredLocations, - Collections.emptyList(), - Collections.emptySet()); - } - /** * Returns a slot profile for the given resource profile, prior allocations and all prior * allocation ids from the whole execution graph. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java index 1fb25c9..e6caaaf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java @@ -88,7 +88,7 @@ public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionSt @Test public void matchNoRequirements() { - SlotProfile slotProfile = SlotProfile.noRequirements(); + SlotProfile slotProfile = SlotProfileTestingUtils.noRequirements(); Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile); Assert.assertTrue(match.isPresent()); @@ -101,7 +101,8 @@ public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionSt public void returnsHostLocalMatchingIfExactTMLocationCannotBeFulfilled() { SlotProfile slotProfile = - SlotProfile.preferredLocality(resourceProfile, Collections.singletonList(tmlX)); + SlotProfileTestingUtils.preferredLocality( + resourceProfile, Collections.singletonList(tmlX)); Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile); Assert.assertTrue(match.isPresent()); @@ -129,7 +130,7 @@ public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionSt new TaskManagerLocation( new ResourceID("non-local-tm"), nonHostLocalInetAddress, 42); SlotProfile slotProfile = - SlotProfile.preferredLocality( + SlotProfileTestingUtils.preferredLocality( resourceProfile, Collections.singletonList(nonLocalTm)); Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile); @@ -143,19 +144,21 @@ public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionSt public void matchPreferredLocation() { SlotProfile slotProfile = - SlotProfile.preferredLocality( + SlotProfileTestingUtils.preferredLocality( biggerResourceProfile, Collections.singletonList(tml2)); Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile); Assert.assertEquals(slotInfo2, match.get().getSlotInfo()); - slotProfile = SlotProfile.preferredLocality(resourceProfile, Arrays.asList(tmlX, tml4)); + slotProfile = + SlotProfileTestingUtils.preferredLocality( + resourceProfile, Arrays.asList(tmlX, tml4)); match = runMatching(slotProfile); Assert.assertEquals(slotInfo4, match.get().getSlotInfo()); slotProfile = - SlotProfile.preferredLocality( + SlotProfileTestingUtils.preferredLocality( resourceProfile, Arrays.asList(tml3, tml1, tml3, tmlX)); match = runMatching(slotProfile); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTestingUtils.java new file mode 100755 index 0000000..cc7ab4e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTestingUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.Collections; + +/** Testing utils for {@link SlotProfile}. */ +public class SlotProfileTestingUtils { + + /** Returns a slot profile that has no requirements. */ + @VisibleForTesting + public static SlotProfile noRequirements() { + return noLocality(ResourceProfile.UNKNOWN); + } + + /** Returns a slot profile for the given resource profile, without any locality requirements. */ + @VisibleForTesting + public static SlotProfile noLocality(ResourceProfile resourceProfile) { + return preferredLocality(resourceProfile, Collections.emptyList()); + } + + /** + * Returns a slot profile for the given resource profile and the preferred locations. + * + * @param resourceProfile specifying the slot requirements + * @param preferredLocations specifying the preferred locations + * @return Slot profile with the given resource profile and preferred locations + */ + @VisibleForTesting + public static SlotProfile preferredLocality( + final ResourceProfile resourceProfile, + final Collection<TaskManagerLocation> preferredLocations) { + + return SlotProfile.priorAllocation( + resourceProfile, + resourceProfile, + preferredLocations, + Collections.emptyList(), + Collections.emptySet()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java index 1a2514a..1d77b9e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.TestLogger; @@ -84,7 +84,7 @@ public class PhysicalSlotProviderImplWithSpreadOutStrategyTest extends TestLogge PhysicalSlotRequest request1 = new PhysicalSlotRequest( new SlotRequestId(), - SlotProfile.preferredLocality( + SlotProfileTestingUtils.preferredLocality( ResourceProfile.ANY, Collections.singleton(preferredTaskManagerLocation)), false); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java index 1cf39f3..6c5477b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.jobmaster.SlotRequestId; @@ -89,7 +89,9 @@ public class PhysicalSlotProviderResource extends ExternalResource { public PhysicalSlotRequest createSimpleRequest() { return new PhysicalSlotRequest( - new SlotRequestId(), SlotProfile.noLocality(ResourceProfile.UNKNOWN), false); + new SlotRequestId(), + SlotProfileTestingUtils.noLocality(ResourceProfile.UNKNOWN), + false); } public ComponentMainThreadExecutor getMainThreadExecutor() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java index 617abfc..cee964a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.TestingPayload; @@ -655,7 +656,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { askedBulks.add(bulk); return (group, resourceProfile) -> { askedGroups.add(group); - return SlotProfile.noLocality(resourceProfile); + return SlotProfileTestingUtils.noLocality(resourceProfile); }; }
