This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c4545e06a54d3694ad106926eae2c592e01473b8 Author: Till Rohrmann <[email protected]> AuthorDate: Sun Mar 14 12:57:39 2021 +0100 [FLINK-21602] Let AdaptiveScheduler create the ExecutionGraph in the ioExecutor The AdaptiveScheudler creates the ExecutionGraph now in the ioExecutor. --- .../scheduler/adaptive/AdaptiveScheduler.java | 19 +++++- .../DefaultSchedulerBatchSchedulingTest.java | 19 ------ .../GloballyTerminalJobStatusListener.java | 44 ++++++++++++++ .../scheduler/adaptive/AdaptiveSchedulerTest.java | 68 ++++++++++++++++------ 4 files changed, 111 insertions(+), 39 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 1c5763e..f03e162 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -112,6 +112,7 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; @@ -123,6 +124,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -730,7 +732,8 @@ public class AdaptiveScheduler vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID())); } - final ExecutionGraph executionGraph = createExecutionGraphAndRestoreState(adjustedJobGraph); + final ExecutionGraph executionGraph = + createExecutionGraphAndRestoreStateAsync(adjustedJobGraph).join(); executionGraph.start(componentMainThreadExecutor); executionGraph.transitionToRunning(); @@ -749,6 +752,20 @@ public class AdaptiveScheduler return executionGraph; } + private CompletableFuture<ExecutionGraph> createExecutionGraphAndRestoreStateAsync( + JobGraph adjustedJobGraph) { + return CompletableFuture.supplyAsync( + () -> { + try { + return createExecutionGraphAndRestoreState(adjustedJobGraph); + } catch (Exception exception) { + throw new CompletionException(exception); + } + }, + ioExecutor); + } + + @Nonnull private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph adjustedJobGraph) throws Exception { ExecutionDeploymentListener executionDeploymentListener = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java index 34cdfd2..a0c428a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.scheduler; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -189,24 +188,6 @@ public class DefaultSchedulerBatchSchedulingTest extends TestLogger { return JobGraphTestUtils.batchJobGraph(jobVertex); } - private static class GloballyTerminalJobStatusListener implements JobStatusListener { - - private final CompletableFuture<JobStatus> globallyTerminalJobStatusFuture = - new CompletableFuture<>(); - - @Override - public void jobStatusChanges( - JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { - if (newJobStatus.isGloballyTerminalState()) { - globallyTerminalJobStatusFuture.complete(newJobStatus); - } - } - - public CompletableFuture<JobStatus> getTerminationFuture() { - return globallyTerminalJobStatusFuture; - } - } - private SchedulerNG createScheduler( JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java new file mode 100644 index 0000000..dcc2a1b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.JobStatusListener; + +import java.util.concurrent.CompletableFuture; + +/** {@link JobStatusListener} which records a globally terminal {@link JobStatus}. */ +public class GloballyTerminalJobStatusListener implements JobStatusListener { + + private final CompletableFuture<JobStatus> globallyTerminalJobStatusFuture = + new CompletableFuture<>(); + + @Override + public void jobStatusChanges( + JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { + if (newJobStatus.isGloballyTerminalState()) { + globallyTerminalJobStatusFuture.complete(newJobStatus); + } + } + + public CompletableFuture<JobStatus> getTerminationFuture() { + return globallyTerminalJobStatusFuture; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 4958a37..0489e77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -35,6 +35,8 @@ import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; @@ -67,6 +69,7 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.apache.flink.runtime.operators.coordination.TestOperatorEvent; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.scheduler.GloballyTerminalJobStatusListener; import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -90,6 +93,8 @@ import java.io.File; import java.io.IOException; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -722,25 +727,50 @@ public class AdaptiveSchedulerTest extends TestLogger { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID()); - final AdaptiveScheduler adaptiveScheduler = - new AdaptiveSchedulerBuilder(jobGraphWithNewOperator, mainThreadExecutor) - .setDeclarativeSlotPool(declarativeSlotPool) - .build(); - - adaptiveScheduler.startScheduling(); - - offerSlots( - declarativeSlotPool, - createSlotOffersForResourceRequirements( - ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1))); - - final ArchivedExecutionGraph archivedExecutionGraph = - adaptiveScheduler.requestJob().getArchivedExecutionGraph(); - - assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); - assertThat( - archivedExecutionGraph.getFailureInfo().getException(), - FlinkMatchers.containsMessage("Failed to rollback to checkpoint/savepoint")); + final GloballyTerminalJobStatusListener jobStatusListener = + new GloballyTerminalJobStatusListener(); + + final ScheduledExecutorService singleThreadExecutor = + Executors.newSingleThreadScheduledExecutor(); + + try { + final ComponentMainThreadExecutor singleMainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + singleThreadExecutor); + final AdaptiveScheduler adaptiveScheduler = + new AdaptiveSchedulerBuilder(jobGraphWithNewOperator, singleMainThreadExecutor) + .setDeclarativeSlotPool(declarativeSlotPool) + .setJobStatusListener(jobStatusListener) + .build(); + + adaptiveScheduler.startScheduling(); + + singleMainThreadExecutor.execute( + () -> + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource( + ResourceProfile.UNKNOWN, 1)))); + + assertThat(jobStatusListener.getTerminationFuture().join(), is(JobStatus.FAILED)); + + final ArchivedExecutionGraph archivedExecutionGraph = + CompletableFuture.supplyAsync( + () -> + adaptiveScheduler + .requestJob() + .getArchivedExecutionGraph(), + singleMainThreadExecutor) + .join(); + + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + assertThat( + archivedExecutionGraph.getFailureInfo().getException(), + FlinkMatchers.containsMessage("Failed to rollback to checkpoint/savepoint")); + } finally { + singleThreadExecutor.shutdownNow(); + } } @Test
