This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5188a1b36953c3cfb889c5866c4835cc3b5dcb5
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Nov 27 12:47:21 2020 +0100

    [FLINK-20382][runtime] Fail hard when JobMaster cannot start scheduling of 
a job
    
    This commit checks whether the SchedulerNG.startScheduling method failed. 
If this is
    the case, then we fail hard by terminating the process.
    
    This closes #14252.
---
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   2 +-
 .../runtime/jobmaster/JobMasterSchedulerTest.java  | 118 +++++++++++++++++++++
 .../runtime/jobmaster/utils/JobMasterBuilder.java  |  17 ++-
 3 files changed, 133 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 504fa0e..7f5018d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -932,7 +932,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        );
                }
 
-               schedulerAssignedFuture.thenRun(this::startScheduling);
+               
FutureUtils.assertNoException(schedulerAssignedFuture.thenRun(this::startScheduling));
        }
 
        private void startScheduling() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
new file mode 100644
index 0000000..e87e1a1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the JobMaster scheduler interaction.
+ */
+public class JobMasterSchedulerTest extends TestLogger {
+
+       @ClassRule
+       public static final TestingRpcServiceResource 
TESTING_RPC_SERVICE_RESOURCE = new TestingRpcServiceResource();
+
+       private final Duration testingTimeout = Duration.ofSeconds(10L);
+
+
+       /**
+        * Tests that we fail fatally if we cannot start the scheduling. See 
FLINK-20382.
+        */
+       @Test
+       public void testIfStartSchedulingFailsJobMasterFailsFatally() throws 
Exception {
+               final SystemExitTrackingSecurityManager trackingSecurityManager 
= new SystemExitTrackingSecurityManager();
+               System.setSecurityManager(trackingSecurityManager);
+
+               final SchedulerNGFactory schedulerFactory = new 
FailingSchedulerFactory();
+               final JobMaster jobMaster = new JobMasterBuilder(new 
JobGraph(), TESTING_RPC_SERVICE_RESOURCE
+                       .getTestingRpcService())
+                       .withSchedulerFactory(schedulerFactory)
+                       .createJobMaster();
+
+               final CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(JobMasterId.generate());
+
+               try {
+                       startFuture.join();
+
+                       
assertThat(trackingSecurityManager.getSystemExitFuture().join(), is(-17));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
Time.milliseconds(testingTimeout.toMillis()));
+                       System.setSecurityManager(null);
+               }
+       }
+
+       private static final class FailingSchedulerFactory implements 
SchedulerNGFactory {
+               @Override
+               public SchedulerNG createInstance(
+                       Logger log,
+                       JobGraph jobGraph,
+                       BackPressureStatsTracker backPressureStatsTracker,
+                       Executor ioExecutor,
+                       Configuration jobMasterConfiguration,
+                       SlotPool slotPool,
+                       ScheduledExecutorService futureExecutor,
+                       ClassLoader userCodeLoader,
+                       CheckpointRecoveryFactory checkpointRecoveryFactory,
+                       Time rpcTimeout,
+                       BlobWriter blobWriter,
+                       JobManagerJobMetricGroup jobManagerJobMetricGroup,
+                       Time slotRequestTimeout,
+                       ShuffleMaster<?> shuffleMaster,
+                       JobMasterPartitionTracker partitionTracker,
+                       ExecutionDeploymentTracker executionDeploymentTracker,
+                       long initializationTimestamp) {
+                       return TestingSchedulerNG.newBuilder()
+                               .setStartSchedulingRunnable(() -> {
+                                       throw new FlinkRuntimeException("Could 
not start scheduling.");
+                               })
+                               .build();
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
index 9ac3d06..8b7c455 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
@@ -29,10 +29,10 @@ import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTrack
 import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmaster.ExecutionDeploymentReconciler;
 import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentReconciler;
-import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentReconciler;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
@@ -42,9 +42,12 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -77,6 +80,9 @@ public class JobMasterBuilder {
 
        private ResourceID jmResourceId = ResourceID.generate();
 
+       @Nullable
+       private SchedulerNGFactory schedulerFactory = null;
+
        private FatalErrorHandler fatalErrorHandler = error -> {
        };
 
@@ -157,6 +163,11 @@ public class JobMasterBuilder {
                return this;
        }
 
+       public JobMasterBuilder withSchedulerFactory(SchedulerNGFactory 
schedulerFactory) {
+               this.schedulerFactory = schedulerFactory;
+               return this;
+       }
+
        public JobMaster createJobMaster() throws Exception {
                final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
 
@@ -173,7 +184,7 @@ public class JobMasterBuilder {
                        onCompletionActions,
                        fatalErrorHandler,
                        JobMasterBuilder.class.getClassLoader(),
-                       
SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration),
+                       schedulerFactory != null ? schedulerFactory : 
SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration),
                        shuffleMaster,
                        partitionTrackerFactory,
                        executionDeploymentTracker,

Reply via email to