This is an automated email from the ASF dual-hosted git repository.
weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 26a0012b14a [FLINK-38892][runtime/rest] Introduce the scheduler and
type information of the target job in the JobDetails(Info).java (#27544)
26a0012b14a is described below
commit 26a0012b14af1adba2a4772bfc257e4f6196b9b8
Author: Yuepeng Pan <[email protected]>
AuthorDate: Fri Feb 27 17:55:24 2026 +0800
[FLINK-38892][runtime/rest] Introduce the scheduler and type information of
the target job in the JobDetails(Info).java (#27544)
---
.../shortcodes/generated/rest_v1_dispatcher.html | 12 ++++++
docs/static/generated/rest_v1_dispatcher.yml | 12 ++++++
.../client/program/rest/RestClusterClientTest.java | 1 +
.../webmonitor/history/HistoryServerTest.java | 3 +-
.../src/test/resources/rest_api_v1.snapshot | 20 +++++++++
.../flink/runtime/dispatcher/Dispatcher.java | 5 +--
.../dispatcher/FileArchivedApplicationStore.java | 4 +-
.../dispatcher/MemoryArchivedApplicationStore.java | 1 -
.../cleanup/CheckpointResourcesCleanupRunner.java | 6 +--
.../JobMasterServiceLeadershipRunner.java | 6 +--
.../webmonitor/ApplicationDetailsInfo.java | 2 -
.../runtime/messages/webmonitor/JobDetails.java | 49 +++++++++++++++++++++-
.../rest/handler/job/JobDetailsHandler.java | 26 ++++++++----
.../rest/handler/job/JobsOverviewHandler.java | 10 +++--
.../runtime/rest/messages/job/JobDetailsInfo.java | 18 ++++++++
.../flink/runtime/scheduler/DefaultScheduler.java | 6 +++
.../runtime/scheduler/ExecutionGraphInfo.java | 23 ++++++++++
.../flink/runtime/scheduler/SchedulerBase.java | 4 +-
.../flink/runtime/scheduler/SchedulerNG.java | 7 ++++
.../scheduler/adaptive/AdaptiveScheduler.java | 8 +++-
.../adaptivebatch/AdaptiveBatchScheduler.java | 6 +++
.../ArchivedApplicationStoreTestUtils.java | 1 -
.../flink/runtime/dispatcher/DispatcherTest.java | 12 +++---
.../rest/handler/job/JobDetailsHandlerTest.java | 6 ++-
.../rest/handler/job/JobsOverviewHandlerTest.java | 6 ++-
.../rest/messages/job/JobDetailsInfoTest.java | 1 +
.../runtime/scheduler/TestingSchedulerNG.java | 6 +++
27 files changed, 215 insertions(+), 46 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 782750b9e88..f7528c81261 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -1425,6 +1425,14 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
"name" : {
"type" : "string"
},
+ "jobType" : {
+ "type" : "string",
+ "enum" : [ "BATCH", "STREAMING" ]
+ },
+ "schedulerType" : {
+ "type" : "string",
+ "enum" : [ "Default", "Adaptive", "AdaptiveBatch" ]
+ },
"pending-operators" : {
"type" : "integer"
},
@@ -1510,6 +1518,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
"type" : "string",
"enum" : [ "BATCH", "STREAMING" ]
},
+ "schedulerType" : {
+ "type" : "string",
+ "enum" : [ "Default", "Adaptive", "AdaptiveBatch" ]
+ },
"maxParallelism" : {
"type" : "integer"
},
diff --git a/docs/static/generated/rest_v1_dispatcher.yml
b/docs/static/generated/rest_v1_dispatcher.yml
index df0c1258a2c..1737d17dbd1 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -2511,6 +2511,10 @@ components:
format: int64
name:
type: string
+ jobType:
+ $ref: "#/components/schemas/JobType"
+ schedulerType:
+ $ref: "#/components/schemas/SchedulerType"
pending-operators:
type: integer
format: int32
@@ -2541,6 +2545,8 @@ components:
$ref: "#/components/schemas/JobID"
job-type:
$ref: "#/components/schemas/JobType"
+ schedulerType:
+ $ref: "#/components/schemas/SchedulerType"
maxParallelism:
type: integer
format: int64
@@ -3037,6 +3043,12 @@ components:
type: string
triggerId:
$ref: "#/components/schemas/TriggerId"
+ SchedulerType:
+ type: string
+ enum:
+ - Default
+ - Adaptive
+ - AdaptiveBatch
SerializedThrowable:
type: object
properties:
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 0c4fc83384e..ca2e1927f4b 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -1272,6 +1272,7 @@ class RestClusterClientTest {
false,
JobStatus.RUNNING,
JobType.STREAMING,
+ null,
1,
2,
1,
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index c67a3147628..9d5f912fd26 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -922,8 +922,7 @@ class HistoryServerTest {
JobsOverviewHeaders.URL,
new MultipleJobsDetails(
Collections.singleton(
- JobDetails.createDetailsForJob(
-
executionGraphInfo.getArchivedExecutionGraph()))));
+
JobDetails.createDetailsForJob(executionGraphInfo))));
FsJsonArchivist.writeArchivedJsons(
ArchivePathUtils.getJobArchivePath(clusterConfig, jobId,
applicationId),
Collections.singletonList(archivedJobsOverview));
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 970bd84a287..7cebf7c7f9e 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -110,6 +110,14 @@
"name" : {
"type" : "string"
},
+ "jobType" : {
+ "type" : "string",
+ "enum" : [ "BATCH", "STREAMING" ]
+ },
+ "schedulerType" : {
+ "type" : "string",
+ "enum" : [ "Default", "Adaptive", "AdaptiveBatch" ]
+ },
"start-time" : {
"type" : "integer"
},
@@ -1196,6 +1204,14 @@
"name" : {
"type" : "string"
},
+ "jobType" : {
+ "type" : "string",
+ "enum" : [ "BATCH", "STREAMING" ]
+ },
+ "schedulerType" : {
+ "type" : "string",
+ "enum" : [ "Default", "Adaptive", "AdaptiveBatch" ]
+ },
"start-time" : {
"type" : "integer"
},
@@ -1267,6 +1283,10 @@
"type" : "string",
"enum" : [ "BATCH", "STREAMING" ]
},
+ "schedulerType" : {
+ "type" : "string",
+ "enum" : [ "Default", "Adaptive", "AdaptiveBatch" ]
+ },
"start-time" : {
"type" : "integer"
},
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index b5f8cc26e19..4d79622e6b7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -1164,9 +1164,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
private Collection<JobDetails> getCompletedJobDetails() {
return Stream.concat(
- getPartialExecutionGraphInfo()
-
.map(ExecutionGraphInfo::getArchivedExecutionGraph)
- .map(JobDetails::createDetailsForJob),
+
getPartialExecutionGraphInfo().map(JobDetails::createDetailsForJob),
archivedApplicationStore.getJobDetails().stream())
.collect(Collectors.toList());
}
@@ -1276,7 +1274,6 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
// is it a completed job?
final Optional<JobDetails> optionalJobDetails =
getExecutionGraphInfoFromStore(jobId)
-
.map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(JobDetails::createDetailsForJob);
return optionalJobDetails
.map(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java
index 002c816b880..14ea186621f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java
@@ -217,9 +217,7 @@ public class FileArchivedApplicationStore implements
ArchivedApplicationStore {
applicationIdToJobIds
.computeIfAbsent(applicationId, ignored -> new
ArrayList<>())
.add(jobId);
- jobDetailsCache.put(
- jobId,
-
JobDetails.createDetailsForJob(executionGraphInfo.getArchivedExecutionGraph()));
+ jobDetailsCache.put(jobId,
JobDetails.createDetailsForJob(executionGraphInfo));
}
storeArchivedApplication(archivedApplication);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
index 3dec94c60d6..058adbfd8f3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
@@ -135,7 +135,6 @@ public class MemoryArchivedApplicationStore implements
ArchivedApplicationStore
public Collection<JobDetails> getJobDetails() {
return archivedApplicationCache.asMap().values().stream()
.flatMap(archivedApplication ->
archivedApplication.getJobs().values().stream())
- .map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(JobDetails::createDetailsForJob)
.collect(Collectors.toList());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index 2aaa70876cf..ece9c2bb8d6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -190,11 +190,7 @@ public class CheckpointResourcesCleanupRunner implements
JobManagerRunner {
@Override
public CompletableFuture<JobDetails> requestJobDetails(Duration timeout) {
- return requestJob(timeout)
- .thenApply(
- executionGraphInfo ->
- JobDetails.createDetailsForJob(
-
executionGraphInfo.getArchivedExecutionGraph()));
+ return requestJob(timeout).thenApply(JobDetails::createDetailsForJob);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
index bd0c5e53a69..2884ce3b470 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
@@ -209,11 +209,7 @@ public class JobMasterServiceLeadershipRunner implements
JobManagerRunner, Leade
@Override
public CompletableFuture<JobDetails> requestJobDetails(Duration timeout) {
- return requestJob(timeout)
- .thenApply(
- executionGraphInfo ->
- JobDetails.createDetailsForJob(
-
executionGraphInfo.getArchivedExecutionGraph()));
+ return requestJob(timeout).thenApply(JobDetails::createDetailsForJob);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
index abea8264b2f..99f1a0ee5ad 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
@@ -24,7 +24,6 @@ import
org.apache.flink.runtime.application.ArchivedApplication;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.json.ApplicationIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.ApplicationIDSerializer;
-import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
@@ -214,7 +213,6 @@ public class ApplicationDetailsInfo implements
ResponseBody, Serializable {
}
final Collection<JobDetails> jobs =
archivedApplication.getJobs().values().stream()
- .map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(JobDetails::createDetailsForJob)
.collect(Collectors.toList());
return new ApplicationDetailsInfo(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index f9e308b5129..99d9857001f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -21,13 +21,16 @@ package org.apache.flink.runtime.messages.webmonitor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.Preconditions;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -36,12 +39,15 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -54,6 +60,8 @@ public class JobDetails implements Serializable {
private static final String FIELD_NAME_JOB_ID = "jid";
private static final String FIELD_NAME_JOB_NAME = "name";
+ private static final String FIELD_NAME_JOB_TYPE = "jobType";
+ private static final String FIELD_NAME_JOB_SCHEDULER = "schedulerType";
private static final String FIELD_NAME_START_TIME = "start-time";
private static final String FIELD_NAME_END_TIME = "end-time";
private static final String FIELD_NAME_DURATION = "duration";
@@ -67,6 +75,9 @@ public class JobDetails implements Serializable {
private final String jobName;
+ @Nullable private final JobType jobType;
+ @Nullable private final JobManagerOptions.SchedulerType schedulerType;
+
private final long startTime;
private final long endTime;
@@ -100,6 +111,9 @@ public class JobDetails implements Serializable {
@JsonProperty(FIELD_NAME_JOB_ID) @JsonDeserialize(using =
JobIDDeserializer.class)
JobID jobId,
@JsonProperty(FIELD_NAME_JOB_NAME) String jobName,
+ @Nullable @JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType,
+ @Nullable @JsonProperty(FIELD_NAME_JOB_SCHEDULER)
+ JobManagerOptions.SchedulerType schedulerType,
@JsonProperty(FIELD_NAME_START_TIME) long startTime,
@JsonProperty(FIELD_NAME_END_TIME) long endTime,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@@ -110,6 +124,8 @@ public class JobDetails implements Serializable {
this(
jobId,
jobName,
+ jobType,
+ schedulerType,
startTime,
endTime,
duration,
@@ -159,6 +175,8 @@ public class JobDetails implements Serializable {
this(
jobId,
jobName,
+ null,
+ null,
startTime,
endTime,
duration,
@@ -173,6 +191,8 @@ public class JobDetails implements Serializable {
public JobDetails(
JobID jobId,
String jobName,
+ @Nullable JobType jobType,
+ @Nullable JobManagerOptions.SchedulerType schedulerType,
long startTime,
long endTime,
long duration,
@@ -184,6 +204,8 @@ public class JobDetails implements Serializable {
int pendingOperators) {
this.jobId = checkNotNull(jobId);
this.jobName = checkNotNull(jobName);
+ this.jobType = jobType;
+ this.schedulerType = schedulerType;
this.startTime = startTime;
this.endTime = endTime;
this.duration = duration;
@@ -199,7 +221,8 @@ public class JobDetails implements Serializable {
this.pendingOperators = pendingOperators;
}
- public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
+ public static JobDetails createDetailsForJob(ExecutionGraphInfo
executionGraphInfo) {
+ final AccessExecutionGraph job =
executionGraphInfo.getArchivedExecutionGraph();
JobStatus status = job.getState();
long started = job.getStatusTimestamp(JobStatus.INITIALIZING);
@@ -241,6 +264,8 @@ public class JobDetails implements Serializable {
return new JobDetails(
job.getJobID(),
job.getJobName(),
+ job.getJobType(),
+ executionGraphInfo.getSchedulerType(),
started,
finished,
duration,
@@ -265,6 +290,18 @@ public class JobDetails implements Serializable {
return jobName;
}
+ @Nullable
+ @JsonProperty(FIELD_NAME_JOB_TYPE)
+ public JobType getJobType() {
+ return jobType;
+ }
+
+ @Nullable
+ @JsonProperty(FIELD_NAME_JOB_SCHEDULER)
+ public JobManagerOptions.SchedulerType getSchedulerType() {
+ return schedulerType;
+ }
+
@JsonProperty(FIELD_NAME_START_TIME)
public long getStartTime() {
return startTime;
@@ -350,6 +387,8 @@ public class JobDetails implements Serializable {
&& this.status == that.status
&& this.jobId.equals(that.jobId)
&& this.jobName.equals(that.jobName)
+ && Objects.equals(jobType, that.jobType)
+ && Objects.equals(schedulerType, that.schedulerType)
&& Arrays.equals(this.tasksPerState, that.tasksPerState)
&&
this.currentExecutionAttempts.equals(that.currentExecutionAttempts)
&& this.pendingOperators == that.pendingOperators;
@@ -362,6 +401,8 @@ public class JobDetails implements Serializable {
public int hashCode() {
int result = jobId.hashCode();
result = 31 * result + jobName.hashCode();
+ result = 31 * result + Objects.hashCode(jobType);
+ result = 31 * result + Objects.hashCode(schedulerType);
result = 31 * result + (int) (startTime ^ (startTime >>> 32));
result = 31 * result + (int) (endTime ^ (endTime >>> 32));
result = 31 * result + status.hashCode();
@@ -381,6 +422,12 @@ public class JobDetails implements Serializable {
+ ", jobName='"
+ jobName
+ '\''
+ + ", jobType='"
+ + jobType
+ + '\''
+ + ", schedulerType='"
+ + schedulerType
+ + '\''
+ ", startTime="
+ startTime
+ ", endTime="
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 4fb246e26c3..7d55c262398 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -39,9 +39,10 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
@@ -58,8 +59,8 @@ import java.util.concurrent.Executor;
/** Handler returning the details for the specified job. */
public class JobDetailsHandler
- extends AbstractAccessExecutionGraphHandler<JobDetailsInfo,
JobMessageParameters>
- implements OnlyExecutionGraphJsonArchivist {
+ extends AbstractExecutionGraphHandler<JobDetailsInfo,
JobMessageParameters>
+ implements JsonArchivist {
private final MetricFetcher metricFetcher;
@@ -84,27 +85,33 @@ public class JobDetailsHandler
@Override
protected JobDetailsInfo handleRequest(
- HandlerRequest<EmptyRequestBody> request, AccessExecutionGraph
executionGraph)
+ HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo
executionGraphInfo)
throws RestHandlerException {
metricFetcher.update();
- return createJobDetailsInfo(executionGraph,
metricFetcher.getMetricStore().getJobs());
+ return createJobDetailsInfo(executionGraphInfo,
metricFetcher.getMetricStore().getJobs());
}
@Override
- public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph
graph)
+ public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo
executionGraphInfo)
throws IOException {
- ResponseBody json = createJobDetailsInfo(graph, null);
+ ResponseBody json = createJobDetailsInfo(executionGraphInfo, null);
String path =
getMessageHeaders()
.getTargetRestEndpointURL()
- .replace(':' + JobIDPathParameter.KEY,
graph.getJobID().toString());
+ .replace(
+ ':' + JobIDPathParameter.KEY,
+ executionGraphInfo
+ .getArchivedExecutionGraph()
+ .getJobID()
+ .toString());
return Collections.singleton(new ArchivedJson(path, json));
}
private static JobDetailsInfo createJobDetailsInfo(
- AccessExecutionGraph executionGraph,
+ ExecutionGraphInfo executionGraphInfo,
@Nullable MetricStore.JobMetricStoreSnapshot jobMetrics) {
final long now = System.currentTimeMillis();
+ final AccessExecutionGraph executionGraph =
executionGraphInfo.getArchivedExecutionGraph();
final long startTime =
executionGraph.getStatusTimestamp(JobStatus.INITIALIZING);
final long endTime =
executionGraph.getState().isGloballyTerminalState()
@@ -153,6 +160,7 @@ public class JobDetailsHandler
executionGraph.isStoppable(),
executionGraph.getState(),
executionGraph.getJobType(),
+ executionGraphInfo.getSchedulerType(),
startTime,
endTime,
duration,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
index 78c57d6caee..92b48f2b035 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
@@ -29,9 +29,10 @@ import
org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import javax.annotation.Nonnull;
@@ -47,7 +48,7 @@ import java.util.concurrent.CompletableFuture;
public class JobsOverviewHandler
extends AbstractRestHandler<
RestfulGateway, EmptyRequestBody, MultipleJobsDetails,
EmptyMessageParameters>
- implements OnlyExecutionGraphJsonArchivist {
+ implements JsonArchivist {
public JobsOverviewHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -66,11 +67,12 @@ public class JobsOverviewHandler
}
@Override
- public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph
graph)
+ public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo
executionGraphInfo)
throws IOException {
+ final AccessExecutionGraph graph =
executionGraphInfo.getArchivedExecutionGraph();
ResponseBody json =
new MultipleJobsDetails(
-
Collections.singleton(JobDetails.createDetailsForJob(graph)));
+
Collections.singleton(JobDetails.createDetailsForJob(executionGraphInfo)));
String path =
getMessageHeaders()
.getTargetRestEndpointURL()
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
index 2f646d3b387..096b8a188ea 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.messages.job;
import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobType;
@@ -66,6 +67,8 @@ public class JobDetailsInfo implements ResponseBody {
public static final String FIELD_NAME_JOB_TYPE = "job-type";
+ public static final String FIELD_NAME_JOB_SCHEDULER = "schedulerType";
+
public static final String FIELD_NAME_START_TIME = "start-time";
public static final String FIELD_NAME_END_TIME = "end-time";
@@ -111,6 +114,10 @@ public class JobDetailsInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_JOB_TYPE)
private final JobType jobType;
+ @Nullable
+ @JsonProperty(FIELD_NAME_JOB_SCHEDULER)
+ private final JobManagerOptions.SchedulerType schedulerType;
+
@JsonProperty(FIELD_NAME_START_TIME)
private final long startTime;
@@ -161,6 +168,8 @@ public class JobDetailsInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable,
@JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus,
@JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType,
+ @Nullable @JsonProperty(FIELD_NAME_JOB_SCHEDULER)
+ JobManagerOptions.SchedulerType schedulerType,
@JsonProperty(FIELD_NAME_START_TIME) long startTime,
@JsonProperty(FIELD_NAME_END_TIME) long endTime,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@@ -181,6 +190,7 @@ public class JobDetailsInfo implements ResponseBody {
this.isStoppable = isStoppable;
this.jobStatus = Preconditions.checkNotNull(jobStatus);
this.jobType = Preconditions.checkNotNull(jobType);
+ this.schedulerType = schedulerType;
this.startTime = startTime;
this.endTime = endTime;
this.duration = duration;
@@ -213,6 +223,7 @@ public class JobDetailsInfo implements ResponseBody {
&& Objects.equals(name, that.name)
&& jobStatus == that.jobStatus
&& jobType == that.jobType
+ && Objects.equals(schedulerType, that.schedulerType)
&& Objects.equals(applicationId, that.applicationId)
&& Objects.equals(timestamps, that.timestamps)
&& Objects.equals(jobVertexInfos, that.jobVertexInfos)
@@ -230,6 +241,7 @@ public class JobDetailsInfo implements ResponseBody {
isStoppable,
jobStatus,
jobType,
+ schedulerType,
startTime,
endTime,
duration,
@@ -274,6 +286,12 @@ public class JobDetailsInfo implements ResponseBody {
return jobType;
}
+ @Nullable
+ @JsonIgnore
+ public JobManagerOptions.SchedulerType getSchedulerType() {
+ return schedulerType;
+ }
+
@JsonIgnore
public long getStartTime() {
return startTime;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 38bdaed8c2b..3c93510427d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.FailureEnricher.Context;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -239,6 +240,11 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
.forEach(ev ->
cancelAllPendingSlotRequestsForVertex(ev.getId()));
}
+ @Override
+ public JobManagerOptions.SchedulerType getSchedulerType() {
+ return JobManagerOptions.SchedulerType.Default;
+ }
+
@Override
protected void startSchedulingInternal() {
log.info(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
index b25996ddc54..1280b625d0c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
@@ -20,10 +20,13 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.Collections;
import java.util.Optional;
@@ -38,6 +41,7 @@ public class ExecutionGraphInfo implements Serializable {
private final ArchivedExecutionGraph executionGraph;
private final Iterable<RootExceptionHistoryEntry> exceptionHistory;
+ @Nullable private final JobManagerOptions.SchedulerType schedulerType;
public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) {
this(
@@ -52,8 +56,16 @@ public class ExecutionGraphInfo implements Serializable {
public ExecutionGraphInfo(
ArchivedExecutionGraph executionGraph,
Iterable<RootExceptionHistoryEntry> exceptionHistory) {
+ this(executionGraph, exceptionHistory, null);
+ }
+
+ public ExecutionGraphInfo(
+ ArchivedExecutionGraph executionGraph,
+ Iterable<RootExceptionHistoryEntry> exceptionHistory,
+ JobManagerOptions.SchedulerType schedulerType) {
this.executionGraph = executionGraph;
this.exceptionHistory = exceptionHistory;
+ this.schedulerType = schedulerType;
}
public JobID getJobId() {
@@ -71,4 +83,15 @@ public class ExecutionGraphInfo implements Serializable {
public Optional<ApplicationID> getApplicationId() {
return executionGraph.getApplicationId();
}
+
+ /**
+ * Returns the scheduler type of the current execution graph info.
+ *
+ * @return The scheduler type of the current execution graph info. Returns
null if exceptions
+ * occurred.
+ */
+ @Nullable
+ public JobManagerOptions.SchedulerType getSchedulerType() {
+ return schedulerType;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index b7e03f8058c..1b50f86600a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -870,7 +870,9 @@ public abstract class SchedulerBase implements SchedulerNG,
CheckpointScheduling
public ExecutionGraphInfo requestJob() {
mainThreadExecutor.assertRunningInMainThread();
return new ExecutionGraphInfo(
- ArchivedExecutionGraph.createFrom(executionGraph),
getExceptionHistory());
+ ArchivedExecutionGraph.createFrom(executionGraph),
+ getExceptionHistory(),
+ getSchedulerType());
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index efdb163b994..4b1fadc2619 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.queryablestate.KvStateID;
@@ -73,6 +74,12 @@ import java.util.concurrent.CompletableFuture;
*/
public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync {
+ /**
+ * Get the {@link
org.apache.flink.configuration.JobManagerOptions.SchedulerType} of the current
+ * job.
+ */
+ JobManagerOptions.SchedulerType getSchedulerType();
+
void startScheduling();
void cancel();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 8fb59ef1707..813b6132f0d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -714,6 +714,11 @@ public class AdaptiveScheduler
"newResourcesAvailable");
}
+ @Override
+ public JobManagerOptions.SchedulerType getSchedulerType() {
+ return JobManagerOptions.SchedulerType.Adaptive;
+ }
+
@Override
public void startScheduling() {
checkIdleSlotTimeout();
@@ -844,7 +849,8 @@ public class AdaptiveScheduler
@Override
public ExecutionGraphInfo requestJob() {
- return new ExecutionGraphInfo(state.getJob(),
exceptionHistory.toArrayList());
+ return new ExecutionGraphInfo(
+ state.getJob(), exceptionHistory.toArrayList(),
getSchedulerType());
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index c8e31f4cdab..b73985d15e7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.JobException;
@@ -251,6 +252,11 @@ public class AdaptiveBatchScheduler extends
DefaultScheduler implements JobGraph
}
}
+ @Override
+ public JobManagerOptions.SchedulerType getSchedulerType() {
+ return JobManagerOptions.SchedulerType.AdaptiveBatch;
+ }
+
@Override
public void onNewJobVerticesAdded(List<JobVertex> newVertices, int
pendingOperatorsCount)
throws Exception {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
index 61e027f343a..70d508c2242 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
@@ -227,7 +227,6 @@ public class ArchivedApplicationStoreTestUtils {
Collection<ArchivedApplication> archivedApplications) {
return archivedApplications.stream()
.flatMap(archivedApplication ->
archivedApplication.getJobs().values().stream())
- .map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(JobDetails::createDetailsForJob)
.collect(Collectors.toList());
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ea40b2e6e42..ef838b8bbe6 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -1679,11 +1679,13 @@ public class DispatcherTest extends
AbstractDispatcherTest {
.setJobDetailsFunction(
() ->
JobDetails.createDetailsForJob(
- new ArchivedExecutionGraphBuilder()
- .setJobID(jobId)
- .setState(currentJobStatus)
-
.setStateTimestamps(stateTimeStampsForRunningJob)
- .build()))
+ new ExecutionGraphInfo(
+ new
ArchivedExecutionGraphBuilder()
+ .setJobID(jobId)
+
.setState(currentJobStatus)
+ .setStateTimestamps(
+
stateTimeStampsForRunningJob)
+ .build())))
.build();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
index 1ecb68cdf7d..5ca09de4ba9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.testutils.TestingUtils;
@@ -112,7 +114,9 @@ class JobDetailsHandlerTest {
@Test
void testGetJobDetailsWithStreamGraphJson() throws RestHandlerException {
JobDetailsInfo jobDetailsInfo =
- jobDetailsHandler.handleRequest(handlerRequest,
archivedExecutionGraph);
+ jobDetailsHandler.handleRequest(
+ handlerRequest,
+ new ExecutionGraphInfo((ArchivedExecutionGraph)
archivedExecutionGraph));
assertThat(jobDetailsInfo.getStreamGraphJson())
.isEqualTo(new
JobPlanInfo.RawJson(expectedStreamGraphJson).toString());
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandlerTest.java
index 9d0e5bd45ef..109972db7ac 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -30,6 +31,7 @@ import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraph
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -88,7 +90,9 @@ class JobsOverviewHandlerTest {
new MultipleJobsDetails(
Collections.singleton(
JobDetails.createDetailsForJob(
-
archivedExecutionGraph)))))
+ new
ExecutionGraphInfo(
+
(ArchivedExecutionGraph)
+
archivedExecutionGraph))))))
.build();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
index 50bbe812182..864ba9981f1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -78,6 +78,7 @@ class JobDetailsInfoTest extends
RestResponseMarshallingTestBase<JobDetailsInfo>
true,
JobStatus.values()[random.nextInt(JobStatus.values().length)],
JobType.STREAMING,
+ null,
1L,
2L,
1L,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 7106f257602..8f3f7f5c870 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.queryablestate.KvStateID;
@@ -91,6 +92,11 @@ public class TestingSchedulerNG implements SchedulerNG {
this.updateJobResourceRequirementsConsumer =
updateJobResourceRequirementsConsumer;
}
+ @Override
+ public JobManagerOptions.SchedulerType getSchedulerType() {
+ return JobManagerOptions.SchedulerType.Default;
+ }
+
@Override
public void startScheduling() {
startSchedulingRunnable.run();