This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c4545e06a54d3694ad106926eae2c592e01473b8
Author: Till Rohrmann <[email protected]>
AuthorDate: Sun Mar 14 12:57:39 2021 +0100

    [FLINK-21602] Let AdaptiveScheduler create the ExecutionGraph in the 
ioExecutor
    
    The AdaptiveScheudler creates the ExecutionGraph now in the ioExecutor.
---
 .../scheduler/adaptive/AdaptiveScheduler.java      | 19 +++++-
 .../DefaultSchedulerBatchSchedulingTest.java       | 19 ------
 .../GloballyTerminalJobStatusListener.java         | 44 ++++++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 68 ++++++++++++++++------
 4 files changed, 111 insertions(+), 39 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 1c5763e..f03e162 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -112,6 +112,7 @@ import org.apache.flink.util.function.ThrowingConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -123,6 +124,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -730,7 +732,8 @@ public class AdaptiveScheduler
             
vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
         }
 
-        final ExecutionGraph executionGraph = 
createExecutionGraphAndRestoreState(adjustedJobGraph);
+        final ExecutionGraph executionGraph =
+                
createExecutionGraphAndRestoreStateAsync(adjustedJobGraph).join();
 
         executionGraph.start(componentMainThreadExecutor);
         executionGraph.transitionToRunning();
@@ -749,6 +752,20 @@ public class AdaptiveScheduler
         return executionGraph;
     }
 
+    private CompletableFuture<ExecutionGraph> 
createExecutionGraphAndRestoreStateAsync(
+            JobGraph adjustedJobGraph) {
+        return CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        return 
createExecutionGraphAndRestoreState(adjustedJobGraph);
+                    } catch (Exception exception) {
+                        throw new CompletionException(exception);
+                    }
+                },
+                ioExecutor);
+    }
+
+    @Nonnull
     private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph 
adjustedJobGraph)
             throws Exception {
         ExecutionDeploymentListener executionDeploymentListener =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
index 34cdfd2..a0c428a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -189,24 +188,6 @@ public class DefaultSchedulerBatchSchedulingTest extends 
TestLogger {
         return JobGraphTestUtils.batchJobGraph(jobVertex);
     }
 
-    private static class GloballyTerminalJobStatusListener implements 
JobStatusListener {
-
-        private final CompletableFuture<JobStatus> 
globallyTerminalJobStatusFuture =
-                new CompletableFuture<>();
-
-        @Override
-        public void jobStatusChanges(
-                JobID jobId, JobStatus newJobStatus, long timestamp, Throwable 
error) {
-            if (newJobStatus.isGloballyTerminalState()) {
-                globallyTerminalJobStatusFuture.complete(newJobStatus);
-            }
-        }
-
-        public CompletableFuture<JobStatus> getTerminationFuture() {
-            return globallyTerminalJobStatusFuture;
-        }
-    }
-
     private SchedulerNG createScheduler(
             JobGraph jobGraph,
             ComponentMainThreadExecutor mainThreadExecutor,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java
new file mode 100644
index 0000000..dcc2a1b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+
+import java.util.concurrent.CompletableFuture;
+
+/** {@link JobStatusListener} which records a globally terminal {@link 
JobStatus}. */
+public class GloballyTerminalJobStatusListener implements JobStatusListener {
+
+    private final CompletableFuture<JobStatus> globallyTerminalJobStatusFuture 
=
+            new CompletableFuture<>();
+
+    @Override
+    public void jobStatusChanges(
+            JobID jobId, JobStatus newJobStatus, long timestamp, Throwable 
error) {
+        if (newJobStatus.isGloballyTerminalState()) {
+            globallyTerminalJobStatusFuture.complete(newJobStatus);
+        }
+    }
+
+    public CompletableFuture<JobStatus> getTerminationFuture() {
+        return globallyTerminalJobStatusFuture;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 4958a37..0489e77 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -35,6 +35,8 @@ import 
org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -67,6 +69,7 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.GloballyTerminalJobStatusListener;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -90,6 +93,8 @@ import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -722,25 +727,50 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID());
 
-        final AdaptiveScheduler adaptiveScheduler =
-                new AdaptiveSchedulerBuilder(jobGraphWithNewOperator, 
mainThreadExecutor)
-                        .setDeclarativeSlotPool(declarativeSlotPool)
-                        .build();
-
-        adaptiveScheduler.startScheduling();
-
-        offerSlots(
-                declarativeSlotPool,
-                createSlotOffersForResourceRequirements(
-                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
1)));
-
-        final ArchivedExecutionGraph archivedExecutionGraph =
-                adaptiveScheduler.requestJob().getArchivedExecutionGraph();
-
-        assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
-        assertThat(
-                archivedExecutionGraph.getFailureInfo().getException(),
-                FlinkMatchers.containsMessage("Failed to rollback to 
checkpoint/savepoint"));
+        final GloballyTerminalJobStatusListener jobStatusListener =
+                new GloballyTerminalJobStatusListener();
+
+        final ScheduledExecutorService singleThreadExecutor =
+                Executors.newSingleThreadScheduledExecutor();
+
+        try {
+            final ComponentMainThreadExecutor singleMainThreadExecutor =
+                    
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                            singleThreadExecutor);
+            final AdaptiveScheduler adaptiveScheduler =
+                    new AdaptiveSchedulerBuilder(jobGraphWithNewOperator, 
singleMainThreadExecutor)
+                            .setDeclarativeSlotPool(declarativeSlotPool)
+                            .setJobStatusListener(jobStatusListener)
+                            .build();
+
+            adaptiveScheduler.startScheduling();
+
+            singleMainThreadExecutor.execute(
+                    () ->
+                            offerSlots(
+                                    declarativeSlotPool,
+                                    createSlotOffersForResourceRequirements(
+                                            ResourceCounter.withResource(
+                                                    ResourceProfile.UNKNOWN, 
1))));
+
+            assertThat(jobStatusListener.getTerminationFuture().join(), 
is(JobStatus.FAILED));
+
+            final ArchivedExecutionGraph archivedExecutionGraph =
+                    CompletableFuture.supplyAsync(
+                                    () ->
+                                            adaptiveScheduler
+                                                    .requestJob()
+                                                    
.getArchivedExecutionGraph(),
+                                    singleMainThreadExecutor)
+                            .join();
+
+            assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
+            assertThat(
+                    archivedExecutionGraph.getFailureInfo().getException(),
+                    FlinkMatchers.containsMessage("Failed to rollback to 
checkpoint/savepoint"));
+        } finally {
+            singleThreadExecutor.shutdownNow();
+        }
     }
 
     @Test

Reply via email to