This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c5188a1b36953c3cfb889c5866c4835cc3b5dcb5 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Nov 27 12:47:21 2020 +0100 [FLINK-20382][runtime] Fail hard when JobMaster cannot start scheduling of a job This commit checks whether the SchedulerNG.startScheduling method failed. If this is the case, then we fail hard by terminating the process. This closes #14252. --- .../apache/flink/runtime/jobmaster/JobMaster.java | 2 +- .../runtime/jobmaster/JobMasterSchedulerTest.java | 118 +++++++++++++++++++++ .../runtime/jobmaster/utils/JobMasterBuilder.java | 17 ++- 3 files changed, 133 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 504fa0e..7f5018d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -932,7 +932,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast ); } - schedulerAssignedFuture.thenRun(this::startScheduling); + FutureUtils.assertNoException(schedulerAssignedFuture.thenRun(this::startScheduling)); } private void startScheduling() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java new file mode 100644 index 0000000..e87e1a1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; +import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcServiceResource; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.runtime.scheduler.SchedulerNGFactory; +import org.apache.flink.runtime.scheduler.TestingSchedulerNG; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the JobMaster scheduler interaction. + */ +public class JobMasterSchedulerTest extends TestLogger { + + @ClassRule + public static final TestingRpcServiceResource TESTING_RPC_SERVICE_RESOURCE = new TestingRpcServiceResource(); + + private final Duration testingTimeout = Duration.ofSeconds(10L); + + + /** + * Tests that we fail fatally if we cannot start the scheduling. See FLINK-20382. + */ + @Test + public void testIfStartSchedulingFailsJobMasterFailsFatally() throws Exception { + final SystemExitTrackingSecurityManager trackingSecurityManager = new SystemExitTrackingSecurityManager(); + System.setSecurityManager(trackingSecurityManager); + + final SchedulerNGFactory schedulerFactory = new FailingSchedulerFactory(); + final JobMaster jobMaster = new JobMasterBuilder(new JobGraph(), TESTING_RPC_SERVICE_RESOURCE + .getTestingRpcService()) + .withSchedulerFactory(schedulerFactory) + .createJobMaster(); + + final CompletableFuture<Acknowledge> startFuture = jobMaster.start(JobMasterId.generate()); + + try { + startFuture.join(); + + assertThat(trackingSecurityManager.getSystemExitFuture().join(), is(-17)); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, Time.milliseconds(testingTimeout.toMillis())); + System.setSecurityManager(null); + } + } + + private static final class FailingSchedulerFactory implements SchedulerNGFactory { + @Override + public SchedulerNG createInstance( + Logger log, + JobGraph jobGraph, + BackPressureStatsTracker backPressureStatsTracker, + Executor ioExecutor, + Configuration jobMasterConfiguration, + SlotPool slotPool, + ScheduledExecutorService futureExecutor, + ClassLoader userCodeLoader, + CheckpointRecoveryFactory checkpointRecoveryFactory, + Time rpcTimeout, + BlobWriter blobWriter, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + Time slotRequestTimeout, + ShuffleMaster<?> shuffleMaster, + JobMasterPartitionTracker partitionTracker, + ExecutionDeploymentTracker executionDeploymentTracker, + long initializationTimestamp) { + return TestingSchedulerNG.newBuilder() + .setStartSchedulingRunnable(() -> { + throw new FlinkRuntimeException("Could not start scheduling."); + }) + .build(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java index 9ac3d06..8b7c455 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java @@ -29,10 +29,10 @@ import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTrack import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; -import org.apache.flink.runtime.jobmaster.ExecutionDeploymentReconciler; import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentReconciler; -import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; +import org.apache.flink.runtime.jobmaster.ExecutionDeploymentReconciler; +import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMasterConfiguration; @@ -42,9 +42,12 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.scheduler.SchedulerNGFactory; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleMaster; +import javax.annotation.Nullable; + import java.util.concurrent.CompletableFuture; /** @@ -77,6 +80,9 @@ public class JobMasterBuilder { private ResourceID jmResourceId = ResourceID.generate(); + @Nullable + private SchedulerNGFactory schedulerFactory = null; + private FatalErrorHandler fatalErrorHandler = error -> { }; @@ -157,6 +163,11 @@ public class JobMasterBuilder { return this; } + public JobMasterBuilder withSchedulerFactory(SchedulerNGFactory schedulerFactory) { + this.schedulerFactory = schedulerFactory; + return this; + } + public JobMaster createJobMaster() throws Exception { final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); @@ -173,7 +184,7 @@ public class JobMasterBuilder { onCompletionActions, fatalErrorHandler, JobMasterBuilder.class.getClassLoader(), - SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration), + schedulerFactory != null ? schedulerFactory : SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration), shuffleMaster, partitionTrackerFactory, executionDeploymentTracker,
