This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63cf221ca2d963aa1394fb0244a8702b9e8c3835
Author: Zhu Zhu <[email protected]>
AuthorDate: Thu Mar 11 16:31:55 2021 +0800

    [FLINK-19142][runtime] Use LocationPreferenceSlotSelectionStrategy for 
batch jobs even if local recovery is enabled
---
 .../scheduler/DefaultSchedulerComponents.java      | 31 +++++++++++++++++-----
 .../DefaultSchedulerComponentsFactoryTest.java     | 30 +++++++++++++++++++++
 2 files changed, 54 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
index d40f8aa..0898c84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ClusterOptions;
@@ -37,6 +38,9 @@ import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStra
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.util.clock.SystemClock;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -46,6 +50,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * PipelinedRegionSchedulingStrategy}.
  */
 public class DefaultSchedulerComponents {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSchedulerComponents.class);
 
     private final SchedulingStrategyFactory schedulingStrategyFactory;
     private final Consumer<ComponentMainThreadExecutor> startUpAction;
@@ -94,7 +99,7 @@ public class DefaultSchedulerComponents {
             final Time slotRequestTimeout) {
 
         final SlotSelectionStrategy slotSelectionStrategy =
-                selectSlotSelectionStrategy(jobMasterConfiguration);
+                selectSlotSelectionStrategy(jobType, jobMasterConfiguration);
         final PhysicalSlotRequestBulkChecker bulkChecker =
                 PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool(
                         slotPool, SystemClock.getInstance());
@@ -112,8 +117,9 @@ public class DefaultSchedulerComponents {
                 allocatorFactory);
     }
 
-    private static SlotSelectionStrategy selectSlotSelectionStrategy(
-            final Configuration configuration) {
+    @VisibleForTesting
+    static SlotSelectionStrategy selectSlotSelectionStrategy(
+            final JobType jobType, final Configuration configuration) {
         final boolean evenlySpreadOutSlots =
                 
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
 
@@ -124,9 +130,20 @@ public class DefaultSchedulerComponents {
                         ? 
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
                         : 
LocationPreferenceSlotSelectionStrategy.createDefault();
 
-        return configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY)
-                ? PreviousAllocationSlotSelectionStrategy.create(
-                        locationPreferenceSlotSelectionStrategy)
-                : locationPreferenceSlotSelectionStrategy;
+        final boolean isLocalRecoveryEnabled =
+                configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY);
+        if (isLocalRecoveryEnabled) {
+            if (jobType == JobType.STREAMING) {
+                return PreviousAllocationSlotSelectionStrategy.create(
+                        locationPreferenceSlotSelectionStrategy);
+            } else {
+                LOG.warn(
+                        "Batch job does not support local recovery. Falling 
back to use "
+                                + 
locationPreferenceSlotSelectionStrategy.getClass());
+                return locationPreferenceSlotSelectionStrategy;
+            }
+        } else {
+            return locationPreferenceSlotSelectionStrategy;
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
index f459c54..85e225b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
@@ -20,10 +20,14 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobType;
+import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import 
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.util.TestLogger;
 
@@ -77,6 +81,32 @@ public class DefaultSchedulerComponentsFactoryTest extends 
TestLogger {
         }
     }
 
+    @Test
+    public void 
testCreatePreviousAllocationSlotSelectionStrategyForLocalRecoveryStreamingJob() 
{
+        final Configuration configuration = new Configuration();
+        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
+
+        final SlotSelectionStrategy slotSelectionStrategy =
+                DefaultSchedulerComponents.selectSlotSelectionStrategy(
+                        JobType.STREAMING, configuration);
+
+        assertThat(
+                slotSelectionStrategy, 
instanceOf(PreviousAllocationSlotSelectionStrategy.class));
+    }
+
+    @Test
+    public void 
testCreateLocationPreferenceSlotSelectionStrategyForLocalRecoveryBatchJob() {
+        final Configuration configuration = new Configuration();
+        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
+
+        final SlotSelectionStrategy slotSelectionStrategy =
+                DefaultSchedulerComponents.selectSlotSelectionStrategy(
+                        JobType.BATCH, configuration);
+
+        assertThat(
+                slotSelectionStrategy, 
instanceOf(LocationPreferenceSlotSelectionStrategy.class));
+    }
+
     private static DefaultSchedulerComponents createSchedulerComponents(
             final Configuration configuration) {
         return createSchedulerComponents(configuration, false, JobType.BATCH);

Reply via email to