This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 26a0012b14a [FLINK-38892][runtime/rest] Introduce the scheduler and 
type information of the target job in the JobDetails(Info).java (#27544)
26a0012b14a is described below

commit 26a0012b14af1adba2a4772bfc257e4f6196b9b8
Author: Yuepeng Pan <[email protected]>
AuthorDate: Fri Feb 27 17:55:24 2026 +0800

    [FLINK-38892][runtime/rest] Introduce the scheduler and type information of 
the target job in the JobDetails(Info).java (#27544)
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 12 ++++++
 docs/static/generated/rest_v1_dispatcher.yml       | 12 ++++++
 .../client/program/rest/RestClusterClientTest.java |  1 +
 .../webmonitor/history/HistoryServerTest.java      |  3 +-
 .../src/test/resources/rest_api_v1.snapshot        | 20 +++++++++
 .../flink/runtime/dispatcher/Dispatcher.java       |  5 +--
 .../dispatcher/FileArchivedApplicationStore.java   |  4 +-
 .../dispatcher/MemoryArchivedApplicationStore.java |  1 -
 .../cleanup/CheckpointResourcesCleanupRunner.java  |  6 +--
 .../JobMasterServiceLeadershipRunner.java          |  6 +--
 .../webmonitor/ApplicationDetailsInfo.java         |  2 -
 .../runtime/messages/webmonitor/JobDetails.java    | 49 +++++++++++++++++++++-
 .../rest/handler/job/JobDetailsHandler.java        | 26 ++++++++----
 .../rest/handler/job/JobsOverviewHandler.java      | 10 +++--
 .../runtime/rest/messages/job/JobDetailsInfo.java  | 18 ++++++++
 .../flink/runtime/scheduler/DefaultScheduler.java  |  6 +++
 .../runtime/scheduler/ExecutionGraphInfo.java      | 23 ++++++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |  4 +-
 .../flink/runtime/scheduler/SchedulerNG.java       |  7 ++++
 .../scheduler/adaptive/AdaptiveScheduler.java      |  8 +++-
 .../adaptivebatch/AdaptiveBatchScheduler.java      |  6 +++
 .../ArchivedApplicationStoreTestUtils.java         |  1 -
 .../flink/runtime/dispatcher/DispatcherTest.java   | 12 +++---
 .../rest/handler/job/JobDetailsHandlerTest.java    |  6 ++-
 .../rest/handler/job/JobsOverviewHandlerTest.java  |  6 ++-
 .../rest/messages/job/JobDetailsInfoTest.java      |  1 +
 .../runtime/scheduler/TestingSchedulerNG.java      |  6 +++
 27 files changed, 215 insertions(+), 46 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 782750b9e88..f7528c81261 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -1425,6 +1425,14 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
           "name" : {
             "type" : "string"
           },
+          "jobType" : {
+            "type" : "string",
+            "enum" : [ "BATCH", "STREAMING" ]
+          },
+          "schedulerType" : {
+            "type" : "string",
+            "enum" : [ "Default", "Adaptive", "AdaptiveBatch" ]
+          },
           "pending-operators" : {
             "type" : "integer"
           },
@@ -1510,6 +1518,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
       "type" : "string",
       "enum" : [ "BATCH", "STREAMING" ]
     },
+    "schedulerType" : {
+      "type" : "string",
+      "enum" : [ "Default", "Adaptive", "AdaptiveBatch" ]
+    },
     "maxParallelism" : {
       "type" : "integer"
     },
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index df0c1258a2c..1737d17dbd1 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -2511,6 +2511,10 @@ components:
           format: int64
         name:
           type: string
+        jobType:
+          $ref: "#/components/schemas/JobType"
+        schedulerType:
+          $ref: "#/components/schemas/SchedulerType"
         pending-operators:
           type: integer
           format: int32
@@ -2541,6 +2545,8 @@ components:
           $ref: "#/components/schemas/JobID"
         job-type:
           $ref: "#/components/schemas/JobType"
+        schedulerType:
+          $ref: "#/components/schemas/SchedulerType"
         maxParallelism:
           type: integer
           format: int64
@@ -3037,6 +3043,12 @@ components:
           type: string
         triggerId:
           $ref: "#/components/schemas/TriggerId"
+    SchedulerType:
+      type: string
+      enum:
+        - Default
+        - Adaptive
+        - AdaptiveBatch
     SerializedThrowable:
       type: object
       properties:
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 0c4fc83384e..ca2e1927f4b 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -1272,6 +1272,7 @@ class RestClusterClientTest {
                         false,
                         JobStatus.RUNNING,
                         JobType.STREAMING,
+                        null,
                         1,
                         2,
                         1,
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index c67a3147628..9d5f912fd26 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -922,8 +922,7 @@ class HistoryServerTest {
                         JobsOverviewHeaders.URL,
                         new MultipleJobsDetails(
                                 Collections.singleton(
-                                        JobDetails.createDetailsForJob(
-                                                
executionGraphInfo.getArchivedExecutionGraph()))));
+                                        
JobDetails.createDetailsForJob(executionGraphInfo))));
         FsJsonArchivist.writeArchivedJsons(
                 ArchivePathUtils.getJobArchivePath(clusterConfig, jobId, 
applicationId),
                 Collections.singletonList(archivedJobsOverview));
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 970bd84a287..7cebf7c7f9e 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -110,6 +110,14 @@
               "name" : {
                 "type" : "string"
               },
+              "jobType" : {
+                "type" : "string",
+                "enum" : [ "BATCH", "STREAMING" ]
+              },
+              "schedulerType" : {
+                "type" : "string",
+                "enum" : [ "Default", "Adaptive", "AdaptiveBatch" ]
+              },
               "start-time" : {
                 "type" : "integer"
               },
@@ -1196,6 +1204,14 @@
               "name" : {
                 "type" : "string"
               },
+              "jobType" : {
+                "type" : "string",
+                "enum" : [ "BATCH", "STREAMING" ]
+              },
+              "schedulerType" : {
+                "type" : "string",
+                "enum" : [ "Default", "Adaptive", "AdaptiveBatch" ]
+              },
               "start-time" : {
                 "type" : "integer"
               },
@@ -1267,6 +1283,10 @@
           "type" : "string",
           "enum" : [ "BATCH", "STREAMING" ]
         },
+        "schedulerType" : {
+          "type" : "string",
+          "enum" : [ "Default", "Adaptive", "AdaptiveBatch" ]
+        },
         "start-time" : {
           "type" : "integer"
         },
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index b5f8cc26e19..4d79622e6b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -1164,9 +1164,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
     private Collection<JobDetails> getCompletedJobDetails() {
         return Stream.concat(
-                        getPartialExecutionGraphInfo()
-                                
.map(ExecutionGraphInfo::getArchivedExecutionGraph)
-                                .map(JobDetails::createDetailsForJob),
+                        
getPartialExecutionGraphInfo().map(JobDetails::createDetailsForJob),
                         archivedApplicationStore.getJobDetails().stream())
                 .collect(Collectors.toList());
     }
@@ -1276,7 +1274,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                             // is it a completed job?
                             final Optional<JobDetails> optionalJobDetails =
                                     getExecutionGraphInfoFromStore(jobId)
-                                            
.map(ExecutionGraphInfo::getArchivedExecutionGraph)
                                             
.map(JobDetails::createDetailsForJob);
                             return optionalJobDetails
                                     .map(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java
index 002c816b880..14ea186621f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java
@@ -217,9 +217,7 @@ public class FileArchivedApplicationStore implements 
ArchivedApplicationStore {
             applicationIdToJobIds
                     .computeIfAbsent(applicationId, ignored -> new 
ArrayList<>())
                     .add(jobId);
-            jobDetailsCache.put(
-                    jobId,
-                    
JobDetails.createDetailsForJob(executionGraphInfo.getArchivedExecutionGraph()));
+            jobDetailsCache.put(jobId, 
JobDetails.createDetailsForJob(executionGraphInfo));
         }
 
         storeArchivedApplication(archivedApplication);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
index 3dec94c60d6..058adbfd8f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
@@ -135,7 +135,6 @@ public class MemoryArchivedApplicationStore implements 
ArchivedApplicationStore
     public Collection<JobDetails> getJobDetails() {
         return archivedApplicationCache.asMap().values().stream()
                 .flatMap(archivedApplication -> 
archivedApplication.getJobs().values().stream())
-                .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                 .map(JobDetails::createDetailsForJob)
                 .collect(Collectors.toList());
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index 2aaa70876cf..ece9c2bb8d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -190,11 +190,7 @@ public class CheckpointResourcesCleanupRunner implements 
JobManagerRunner {
 
     @Override
     public CompletableFuture<JobDetails> requestJobDetails(Duration timeout) {
-        return requestJob(timeout)
-                .thenApply(
-                        executionGraphInfo ->
-                                JobDetails.createDetailsForJob(
-                                        
executionGraphInfo.getArchivedExecutionGraph()));
+        return requestJob(timeout).thenApply(JobDetails::createDetailsForJob);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
index bd0c5e53a69..2884ce3b470 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
@@ -209,11 +209,7 @@ public class JobMasterServiceLeadershipRunner implements 
JobManagerRunner, Leade
 
     @Override
     public CompletableFuture<JobDetails> requestJobDetails(Duration timeout) {
-        return requestJob(timeout)
-                .thenApply(
-                        executionGraphInfo ->
-                                JobDetails.createDetailsForJob(
-                                        
executionGraphInfo.getArchivedExecutionGraph()));
+        return requestJob(timeout).thenApply(JobDetails::createDetailsForJob);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
index abea8264b2f..99f1a0ee5ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.application.ArchivedApplication;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.json.ApplicationIDDeserializer;
 import org.apache.flink.runtime.rest.messages.json.ApplicationIDSerializer;
-import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.Preconditions;
 
@@ -214,7 +213,6 @@ public class ApplicationDetailsInfo implements 
ResponseBody, Serializable {
         }
         final Collection<JobDetails> jobs =
                 archivedApplication.getJobs().values().stream()
-                        .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                         .map(JobDetails::createDetailsForJob)
                         .collect(Collectors.toList());
         return new ApplicationDetailsInfo(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index f9e308b5129..99d9857001f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -21,13 +21,16 @@ package org.apache.flink.runtime.messages.webmonitor;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
 import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -36,12 +39,15 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -54,6 +60,8 @@ public class JobDetails implements Serializable {
 
     private static final String FIELD_NAME_JOB_ID = "jid";
     private static final String FIELD_NAME_JOB_NAME = "name";
+    private static final String FIELD_NAME_JOB_TYPE = "jobType";
+    private static final String FIELD_NAME_JOB_SCHEDULER = "schedulerType";
     private static final String FIELD_NAME_START_TIME = "start-time";
     private static final String FIELD_NAME_END_TIME = "end-time";
     private static final String FIELD_NAME_DURATION = "duration";
@@ -67,6 +75,9 @@ public class JobDetails implements Serializable {
 
     private final String jobName;
 
+    @Nullable private final JobType jobType;
+    @Nullable private final JobManagerOptions.SchedulerType schedulerType;
+
     private final long startTime;
 
     private final long endTime;
@@ -100,6 +111,9 @@ public class JobDetails implements Serializable {
             @JsonProperty(FIELD_NAME_JOB_ID) @JsonDeserialize(using = 
JobIDDeserializer.class)
                     JobID jobId,
             @JsonProperty(FIELD_NAME_JOB_NAME) String jobName,
+            @Nullable @JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType,
+            @Nullable @JsonProperty(FIELD_NAME_JOB_SCHEDULER)
+                    JobManagerOptions.SchedulerType schedulerType,
             @JsonProperty(FIELD_NAME_START_TIME) long startTime,
             @JsonProperty(FIELD_NAME_END_TIME) long endTime,
             @JsonProperty(FIELD_NAME_DURATION) long duration,
@@ -110,6 +124,8 @@ public class JobDetails implements Serializable {
         this(
                 jobId,
                 jobName,
+                jobType,
+                schedulerType,
                 startTime,
                 endTime,
                 duration,
@@ -159,6 +175,8 @@ public class JobDetails implements Serializable {
         this(
                 jobId,
                 jobName,
+                null,
+                null,
                 startTime,
                 endTime,
                 duration,
@@ -173,6 +191,8 @@ public class JobDetails implements Serializable {
     public JobDetails(
             JobID jobId,
             String jobName,
+            @Nullable JobType jobType,
+            @Nullable JobManagerOptions.SchedulerType schedulerType,
             long startTime,
             long endTime,
             long duration,
@@ -184,6 +204,8 @@ public class JobDetails implements Serializable {
             int pendingOperators) {
         this.jobId = checkNotNull(jobId);
         this.jobName = checkNotNull(jobName);
+        this.jobType = jobType;
+        this.schedulerType = schedulerType;
         this.startTime = startTime;
         this.endTime = endTime;
         this.duration = duration;
@@ -199,7 +221,8 @@ public class JobDetails implements Serializable {
         this.pendingOperators = pendingOperators;
     }
 
-    public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
+    public static JobDetails createDetailsForJob(ExecutionGraphInfo 
executionGraphInfo) {
+        final AccessExecutionGraph job = 
executionGraphInfo.getArchivedExecutionGraph();
         JobStatus status = job.getState();
 
         long started = job.getStatusTimestamp(JobStatus.INITIALIZING);
@@ -241,6 +264,8 @@ public class JobDetails implements Serializable {
         return new JobDetails(
                 job.getJobID(),
                 job.getJobName(),
+                job.getJobType(),
+                executionGraphInfo.getSchedulerType(),
                 started,
                 finished,
                 duration,
@@ -265,6 +290,18 @@ public class JobDetails implements Serializable {
         return jobName;
     }
 
+    @Nullable
+    @JsonProperty(FIELD_NAME_JOB_TYPE)
+    public JobType getJobType() {
+        return jobType;
+    }
+
+    @Nullable
+    @JsonProperty(FIELD_NAME_JOB_SCHEDULER)
+    public JobManagerOptions.SchedulerType getSchedulerType() {
+        return schedulerType;
+    }
+
     @JsonProperty(FIELD_NAME_START_TIME)
     public long getStartTime() {
         return startTime;
@@ -350,6 +387,8 @@ public class JobDetails implements Serializable {
                     && this.status == that.status
                     && this.jobId.equals(that.jobId)
                     && this.jobName.equals(that.jobName)
+                    && Objects.equals(jobType, that.jobType)
+                    && Objects.equals(schedulerType, that.schedulerType)
                     && Arrays.equals(this.tasksPerState, that.tasksPerState)
                     && 
this.currentExecutionAttempts.equals(that.currentExecutionAttempts)
                     && this.pendingOperators == that.pendingOperators;
@@ -362,6 +401,8 @@ public class JobDetails implements Serializable {
     public int hashCode() {
         int result = jobId.hashCode();
         result = 31 * result + jobName.hashCode();
+        result = 31 * result + Objects.hashCode(jobType);
+        result = 31 * result + Objects.hashCode(schedulerType);
         result = 31 * result + (int) (startTime ^ (startTime >>> 32));
         result = 31 * result + (int) (endTime ^ (endTime >>> 32));
         result = 31 * result + status.hashCode();
@@ -381,6 +422,12 @@ public class JobDetails implements Serializable {
                 + ", jobName='"
                 + jobName
                 + '\''
+                + ", jobType='"
+                + jobType
+                + '\''
+                + ", schedulerType='"
+                + schedulerType
+                + '\''
                 + ", startTime="
                 + startTime
                 + ", endTime="
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 4fb246e26c3..7d55c262398 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -39,9 +39,10 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import 
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.Preconditions;
@@ -58,8 +59,8 @@ import java.util.concurrent.Executor;
 
 /** Handler returning the details for the specified job. */
 public class JobDetailsHandler
-        extends AbstractAccessExecutionGraphHandler<JobDetailsInfo, 
JobMessageParameters>
-        implements OnlyExecutionGraphJsonArchivist {
+        extends AbstractExecutionGraphHandler<JobDetailsInfo, 
JobMessageParameters>
+        implements JsonArchivist {
 
     private final MetricFetcher metricFetcher;
 
@@ -84,27 +85,33 @@ public class JobDetailsHandler
 
     @Override
     protected JobDetailsInfo handleRequest(
-            HandlerRequest<EmptyRequestBody> request, AccessExecutionGraph 
executionGraph)
+            HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo 
executionGraphInfo)
             throws RestHandlerException {
         metricFetcher.update();
-        return createJobDetailsInfo(executionGraph, 
metricFetcher.getMetricStore().getJobs());
+        return createJobDetailsInfo(executionGraphInfo, 
metricFetcher.getMetricStore().getJobs());
     }
 
     @Override
-    public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph 
graph)
+    public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo 
executionGraphInfo)
             throws IOException {
-        ResponseBody json = createJobDetailsInfo(graph, null);
+        ResponseBody json = createJobDetailsInfo(executionGraphInfo, null);
         String path =
                 getMessageHeaders()
                         .getTargetRestEndpointURL()
-                        .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString());
+                        .replace(
+                                ':' + JobIDPathParameter.KEY,
+                                executionGraphInfo
+                                        .getArchivedExecutionGraph()
+                                        .getJobID()
+                                        .toString());
         return Collections.singleton(new ArchivedJson(path, json));
     }
 
     private static JobDetailsInfo createJobDetailsInfo(
-            AccessExecutionGraph executionGraph,
+            ExecutionGraphInfo executionGraphInfo,
             @Nullable MetricStore.JobMetricStoreSnapshot jobMetrics) {
         final long now = System.currentTimeMillis();
+        final AccessExecutionGraph executionGraph = 
executionGraphInfo.getArchivedExecutionGraph();
         final long startTime = 
executionGraph.getStatusTimestamp(JobStatus.INITIALIZING);
         final long endTime =
                 executionGraph.getState().isGloballyTerminalState()
@@ -153,6 +160,7 @@ public class JobDetailsHandler
                 executionGraph.isStoppable(),
                 executionGraph.getState(),
                 executionGraph.getJobType(),
+                executionGraphInfo.getSchedulerType(),
                 startTime,
                 endTime,
                 duration,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
index 78c57d6caee..92b48f2b035 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
@@ -29,9 +29,10 @@ import 
org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import 
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nonnull;
@@ -47,7 +48,7 @@ import java.util.concurrent.CompletableFuture;
 public class JobsOverviewHandler
         extends AbstractRestHandler<
                 RestfulGateway, EmptyRequestBody, MultipleJobsDetails, 
EmptyMessageParameters>
-        implements OnlyExecutionGraphJsonArchivist {
+        implements JsonArchivist {
 
     public JobsOverviewHandler(
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -66,11 +67,12 @@ public class JobsOverviewHandler
     }
 
     @Override
-    public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph 
graph)
+    public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo 
executionGraphInfo)
             throws IOException {
+        final AccessExecutionGraph graph = 
executionGraphInfo.getArchivedExecutionGraph();
         ResponseBody json =
                 new MultipleJobsDetails(
-                        
Collections.singleton(JobDetails.createDetailsForJob(graph)));
+                        
Collections.singleton(JobDetails.createDetailsForJob(executionGraphInfo)));
         String path =
                 getMessageHeaders()
                         .getTargetRestEndpointURL()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
index 2f646d3b387..096b8a188ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.messages.job;
 import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobType;
@@ -66,6 +67,8 @@ public class JobDetailsInfo implements ResponseBody {
 
     public static final String FIELD_NAME_JOB_TYPE = "job-type";
 
+    public static final String FIELD_NAME_JOB_SCHEDULER = "schedulerType";
+
     public static final String FIELD_NAME_START_TIME = "start-time";
 
     public static final String FIELD_NAME_END_TIME = "end-time";
@@ -111,6 +114,10 @@ public class JobDetailsInfo implements ResponseBody {
     @JsonProperty(FIELD_NAME_JOB_TYPE)
     private final JobType jobType;
 
+    @Nullable
+    @JsonProperty(FIELD_NAME_JOB_SCHEDULER)
+    private final JobManagerOptions.SchedulerType schedulerType;
+
     @JsonProperty(FIELD_NAME_START_TIME)
     private final long startTime;
 
@@ -161,6 +168,8 @@ public class JobDetailsInfo implements ResponseBody {
             @JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable,
             @JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus,
             @JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType,
+            @Nullable @JsonProperty(FIELD_NAME_JOB_SCHEDULER)
+                    JobManagerOptions.SchedulerType schedulerType,
             @JsonProperty(FIELD_NAME_START_TIME) long startTime,
             @JsonProperty(FIELD_NAME_END_TIME) long endTime,
             @JsonProperty(FIELD_NAME_DURATION) long duration,
@@ -181,6 +190,7 @@ public class JobDetailsInfo implements ResponseBody {
         this.isStoppable = isStoppable;
         this.jobStatus = Preconditions.checkNotNull(jobStatus);
         this.jobType = Preconditions.checkNotNull(jobType);
+        this.schedulerType = schedulerType;
         this.startTime = startTime;
         this.endTime = endTime;
         this.duration = duration;
@@ -213,6 +223,7 @@ public class JobDetailsInfo implements ResponseBody {
                 && Objects.equals(name, that.name)
                 && jobStatus == that.jobStatus
                 && jobType == that.jobType
+                && Objects.equals(schedulerType, that.schedulerType)
                 && Objects.equals(applicationId, that.applicationId)
                 && Objects.equals(timestamps, that.timestamps)
                 && Objects.equals(jobVertexInfos, that.jobVertexInfos)
@@ -230,6 +241,7 @@ public class JobDetailsInfo implements ResponseBody {
                 isStoppable,
                 jobStatus,
                 jobType,
+                schedulerType,
                 startTime,
                 endTime,
                 duration,
@@ -274,6 +286,12 @@ public class JobDetailsInfo implements ResponseBody {
         return jobType;
     }
 
+    @Nullable
+    @JsonIgnore
+    public JobManagerOptions.SchedulerType getSchedulerType() {
+        return schedulerType;
+    }
+
     @JsonIgnore
     public long getStartTime() {
         return startTime;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 38bdaed8c2b..3c93510427d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.failure.FailureEnricher.Context;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -239,6 +240,11 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
                 .forEach(ev -> 
cancelAllPendingSlotRequestsForVertex(ev.getId()));
     }
 
+    @Override
+    public JobManagerOptions.SchedulerType getSchedulerType() {
+        return JobManagerOptions.SchedulerType.Default;
+    }
+
     @Override
     protected void startSchedulingInternal() {
         log.info(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
index b25996ddc54..1280b625d0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
@@ -20,10 +20,13 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Optional;
@@ -38,6 +41,7 @@ public class ExecutionGraphInfo implements Serializable {
 
     private final ArchivedExecutionGraph executionGraph;
     private final Iterable<RootExceptionHistoryEntry> exceptionHistory;
+    @Nullable private final JobManagerOptions.SchedulerType schedulerType;
 
     public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) {
         this(
@@ -52,8 +56,16 @@ public class ExecutionGraphInfo implements Serializable {
     public ExecutionGraphInfo(
             ArchivedExecutionGraph executionGraph,
             Iterable<RootExceptionHistoryEntry> exceptionHistory) {
+        this(executionGraph, exceptionHistory, null);
+    }
+
+    public ExecutionGraphInfo(
+            ArchivedExecutionGraph executionGraph,
+            Iterable<RootExceptionHistoryEntry> exceptionHistory,
+            JobManagerOptions.SchedulerType schedulerType) {
         this.executionGraph = executionGraph;
         this.exceptionHistory = exceptionHistory;
+        this.schedulerType = schedulerType;
     }
 
     public JobID getJobId() {
@@ -71,4 +83,15 @@ public class ExecutionGraphInfo implements Serializable {
     public Optional<ApplicationID> getApplicationId() {
         return executionGraph.getApplicationId();
     }
+
+    /**
+     * Returns the scheduler type of the current execution graph info.
+     *
+     * @return The scheduler type of the current execution graph info. Returns 
null if exceptions
+     *     occurred.
+     */
+    @Nullable
+    public JobManagerOptions.SchedulerType getSchedulerType() {
+        return schedulerType;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index b7e03f8058c..1b50f86600a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -870,7 +870,9 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
     public ExecutionGraphInfo requestJob() {
         mainThreadExecutor.assertRunningInMainThread();
         return new ExecutionGraphInfo(
-                ArchivedExecutionGraph.createFrom(executionGraph), 
getExceptionHistory());
+                ArchivedExecutionGraph.createFrom(executionGraph),
+                getExceptionHistory(),
+                getSchedulerType());
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index efdb163b994..4b1fadc2619 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
@@ -73,6 +74,12 @@ import java.util.concurrent.CompletableFuture;
  */
 public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync {
 
+    /**
+     * Get the {@link 
org.apache.flink.configuration.JobManagerOptions.SchedulerType} of the current
+     * job.
+     */
+    JobManagerOptions.SchedulerType getSchedulerType();
+
     void startScheduling();
 
     void cancel();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 8fb59ef1707..813b6132f0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -714,6 +714,11 @@ public class AdaptiveScheduler
                 "newResourcesAvailable");
     }
 
+    @Override
+    public JobManagerOptions.SchedulerType getSchedulerType() {
+        return JobManagerOptions.SchedulerType.Adaptive;
+    }
+
     @Override
     public void startScheduling() {
         checkIdleSlotTimeout();
@@ -844,7 +849,8 @@ public class AdaptiveScheduler
 
     @Override
     public ExecutionGraphInfo requestJob() {
-        return new ExecutionGraphInfo(state.getJob(), 
exceptionHistory.toArrayList());
+        return new ExecutionGraphInfo(
+                state.getJob(), exceptionHistory.toArrayList(), 
getSchedulerType());
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index c8e31f4cdab..b73985d15e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.BatchExecutionOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.JobException;
@@ -251,6 +252,11 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler implements JobGraph
         }
     }
 
+    @Override
+    public JobManagerOptions.SchedulerType getSchedulerType() {
+        return JobManagerOptions.SchedulerType.AdaptiveBatch;
+    }
+
     @Override
     public void onNewJobVerticesAdded(List<JobVertex> newVertices, int 
pendingOperatorsCount)
             throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
index 61e027f343a..70d508c2242 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
@@ -227,7 +227,6 @@ public class ArchivedApplicationStoreTestUtils {
             Collection<ArchivedApplication> archivedApplications) {
         return archivedApplications.stream()
                 .flatMap(archivedApplication -> 
archivedApplication.getJobs().values().stream())
-                .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                 .map(JobDetails::createDetailsForJob)
                 .collect(Collectors.toList());
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ea40b2e6e42..ef838b8bbe6 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -1679,11 +1679,13 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 .setJobDetailsFunction(
                         () ->
                                 JobDetails.createDetailsForJob(
-                                        new ArchivedExecutionGraphBuilder()
-                                                .setJobID(jobId)
-                                                .setState(currentJobStatus)
-                                                
.setStateTimestamps(stateTimeStampsForRunningJob)
-                                                .build()))
+                                        new ExecutionGraphInfo(
+                                                new 
ArchivedExecutionGraphBuilder()
+                                                        .setJobID(jobId)
+                                                        
.setState(currentJobStatus)
+                                                        .setStateTimestamps(
+                                                                
stateTimeStampsForRunningJob)
+                                                        .build())))
                 .build();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
index 1ecb68cdf7d..5ca09de4ba9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.HandlerRequestException;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.testutils.TestingUtils;
@@ -112,7 +114,9 @@ class JobDetailsHandlerTest {
     @Test
     void testGetJobDetailsWithStreamGraphJson() throws RestHandlerException {
         JobDetailsInfo jobDetailsInfo =
-                jobDetailsHandler.handleRequest(handlerRequest, 
archivedExecutionGraph);
+                jobDetailsHandler.handleRequest(
+                        handlerRequest,
+                        new ExecutionGraphInfo((ArchivedExecutionGraph) 
archivedExecutionGraph));
         assertThat(jobDetailsInfo.getStreamGraphJson())
                 .isEqualTo(new 
JobPlanInfo.RawJson(expectedStreamGraphJson).toString());
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandlerTest.java
index 9d0e5bd45ef..109972db7ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -30,6 +31,7 @@ import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraph
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -88,7 +90,9 @@ class JobsOverviewHandlerTest {
                                                 new MultipleJobsDetails(
                                                         Collections.singleton(
                                                                 
JobDetails.createDetailsForJob(
-                                                                        
archivedExecutionGraph)))))
+                                                                        new 
ExecutionGraphInfo(
+                                                                               
 (ArchivedExecutionGraph)
+                                                                               
         archivedExecutionGraph))))))
                         .build();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
index 50bbe812182..864ba9981f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -78,6 +78,7 @@ class JobDetailsInfoTest extends 
RestResponseMarshallingTestBase<JobDetailsInfo>
                 true,
                 JobStatus.values()[random.nextInt(JobStatus.values().length)],
                 JobType.STREAMING,
+                null,
                 1L,
                 2L,
                 1L,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 7106f257602..8f3f7f5c870 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
@@ -91,6 +92,11 @@ public class TestingSchedulerNG implements SchedulerNG {
         this.updateJobResourceRequirementsConsumer = 
updateJobResourceRequirementsConsumer;
     }
 
+    @Override
+    public JobManagerOptions.SchedulerType getSchedulerType() {
+        return JobManagerOptions.SchedulerType.Default;
+    }
+
     @Override
     public void startScheduling() {
         startSchedulingRunnable.run();

Reply via email to