This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 63cf221ca2d963aa1394fb0244a8702b9e8c3835 Author: Zhu Zhu <[email protected]> AuthorDate: Thu Mar 11 16:31:55 2021 +0800 [FLINK-19142][runtime] Use LocationPreferenceSlotSelectionStrategy for batch jobs even if local recovery is enabled --- .../scheduler/DefaultSchedulerComponents.java | 31 +++++++++++++++++----- .../DefaultSchedulerComponentsFactoryTest.java | 30 +++++++++++++++++++++ 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java index d40f8aa..0898c84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.scheduler; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ClusterOptions; @@ -37,6 +38,9 @@ import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStra import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; import org.apache.flink.util.clock.SystemClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.function.Consumer; import static org.apache.flink.util.Preconditions.checkArgument; @@ -46,6 +50,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * PipelinedRegionSchedulingStrategy}. */ public class DefaultSchedulerComponents { + private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulerComponents.class); private final SchedulingStrategyFactory schedulingStrategyFactory; private final Consumer<ComponentMainThreadExecutor> startUpAction; @@ -94,7 +99,7 @@ public class DefaultSchedulerComponents { final Time slotRequestTimeout) { final SlotSelectionStrategy slotSelectionStrategy = - selectSlotSelectionStrategy(jobMasterConfiguration); + selectSlotSelectionStrategy(jobType, jobMasterConfiguration); final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool( slotPool, SystemClock.getInstance()); @@ -112,8 +117,9 @@ public class DefaultSchedulerComponents { allocatorFactory); } - private static SlotSelectionStrategy selectSlotSelectionStrategy( - final Configuration configuration) { + @VisibleForTesting + static SlotSelectionStrategy selectSlotSelectionStrategy( + final JobType jobType, final Configuration configuration) { final boolean evenlySpreadOutSlots = configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY); @@ -124,9 +130,20 @@ public class DefaultSchedulerComponents { ? LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut() : LocationPreferenceSlotSelectionStrategy.createDefault(); - return configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY) - ? PreviousAllocationSlotSelectionStrategy.create( - locationPreferenceSlotSelectionStrategy) - : locationPreferenceSlotSelectionStrategy; + final boolean isLocalRecoveryEnabled = + configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY); + if (isLocalRecoveryEnabled) { + if (jobType == JobType.STREAMING) { + return PreviousAllocationSlotSelectionStrategy.create( + locationPreferenceSlotSelectionStrategy); + } else { + LOG.warn( + "Batch job does not support local recovery. Falling back to use " + + locationPreferenceSlotSelectionStrategy.getClass()); + return locationPreferenceSlotSelectionStrategy; + } + } else { + return locationPreferenceSlotSelectionStrategy; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java index f459c54..85e225b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java @@ -20,10 +20,14 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils; +import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy; import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy; import org.apache.flink.util.TestLogger; @@ -77,6 +81,32 @@ public class DefaultSchedulerComponentsFactoryTest extends TestLogger { } } + @Test + public void testCreatePreviousAllocationSlotSelectionStrategyForLocalRecoveryStreamingJob() { + final Configuration configuration = new Configuration(); + configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true); + + final SlotSelectionStrategy slotSelectionStrategy = + DefaultSchedulerComponents.selectSlotSelectionStrategy( + JobType.STREAMING, configuration); + + assertThat( + slotSelectionStrategy, instanceOf(PreviousAllocationSlotSelectionStrategy.class)); + } + + @Test + public void testCreateLocationPreferenceSlotSelectionStrategyForLocalRecoveryBatchJob() { + final Configuration configuration = new Configuration(); + configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true); + + final SlotSelectionStrategy slotSelectionStrategy = + DefaultSchedulerComponents.selectSlotSelectionStrategy( + JobType.BATCH, configuration); + + assertThat( + slotSelectionStrategy, instanceOf(LocationPreferenceSlotSelectionStrategy.class)); + } + private static DefaultSchedulerComponents createSchedulerComponents( final Configuration configuration) { return createSchedulerComponents(configuration, false, JobType.BATCH);
