This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5263f9cbf20536e9a81a0044f22b033e78a908ea
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Wed Apr 24 17:25:08 2024 +0800

    [FLINK-35222][rest] Add getJobType for AccessExecutionGraph
---
 .../apache/flink/runtime/dispatcher/Dispatcher.java    |  1 +
 .../JobMasterServiceLeadershipRunnerFactory.java       |  1 +
 .../cleanup/CheckpointResourcesCleanupRunner.java      |  1 +
 .../runtime/executiongraph/AccessExecutionGraph.java   |  9 +++++++++
 .../runtime/executiongraph/ArchivedExecutionGraph.java | 18 ++++++++++++++++++
 .../runtime/executiongraph/DefaultExecutionGraph.java  | 11 +++++++++++
 .../executiongraph/DefaultExecutionGraphBuilder.java   |  1 +
 .../DefaultJobMasterServiceProcessFactory.java         | 12 +++++++++++-
 .../runtime/scheduler/adaptive/AdaptiveScheduler.java  |  1 +
 .../dispatcher/DispatcherResourceCleanupTest.java      |  1 +
 .../flink/runtime/dispatcher/DispatcherTest.java       |  7 +++++++
 .../executiongraph/ArchivedExecutionGraphTest.java     |  4 ++++
 .../jobmaster/DefaultJobMasterServiceProcessTest.java  |  2 +-
 .../JobMasterServiceLeadershipRunnerTest.java          |  1 +
 .../runtime/jobmaster/TestingJobManagerRunner.java     |  9 ++++++++-
 .../TestingJobMasterServiceProcessFactory.java         |  3 ++-
 .../TestingJobMasterServiceProcessFactoryOld.java      |  9 ++++++++-
 .../legacy/utils/ArchivedExecutionGraphBuilder.java    |  2 ++
 .../runtime/scheduler/ExecutionGraphInfoTest.java      |  4 ++++
 .../adaptive/StateTrackingMockExecutionGraph.java      |  7 +++++++
 20 files changed, 99 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5b89203095e..e138147d97d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -563,6 +563,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                         jobId,
                         jobName,
                         JobStatus.FAILED,
+                        null,
                         exception,
                         null,
                         System.currentTimeMillis());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
index bf7c69fb077..718db104ed9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
@@ -118,6 +118,7 @@ public enum JobMasterServiceLeadershipRunnerFactory 
implements JobManagerRunnerF
                 new DefaultJobMasterServiceProcessFactory(
                         jobGraph.getJobID(),
                         jobGraph.getName(),
+                        jobGraph.getJobType(),
                         jobGraph.getCheckpointingSettings(),
                         initializationTimestamp,
                         jobMasterServiceFactory);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index 9bd8a1b499e..59128d853b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -226,6 +226,7 @@ public class CheckpointResourcesCleanupRunner implements 
JobManagerRunner {
                         jobResult.getJobId(),
                         "unknown",
                         getJobStatus(jobResult),
+                        null,
                         jobResult.getSerializedThrowable().orElse(null),
                         null,
                         initializationTimestamp));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 011e864a4fd..e9998b1a78f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.util.OptionalFailure;
@@ -67,6 +68,14 @@ public interface AccessExecutionGraph extends 
JobStatusProvider {
      */
     JobStatus getState();
 
+    /**
+     * Returns the {@link JobType} for this execution graph.
+     *
+     * @return job type for this execution graph. It may be null when an 
exception occurs.
+     */
+    @Nullable
+    JobType getJobType();
+
     /**
      * Returns the exception that caused the job to fail. This is the first 
root exception that was
      * not recoverable and triggered job failure.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index c0f25b870c6..3fb0a09fde1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
@@ -83,6 +84,9 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
     /** Current status of the job execution. */
     private final JobStatus state;
 
+    /** The job type of the job execution. */
+    @Nullable private final JobType jobType;
+
     /**
      * The exception that caused the job to fail. This is set to the first 
root exception that was
      * not recoverable and triggered job failure
@@ -115,6 +119,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
             List<ArchivedExecutionJobVertex> verticesInCreationOrder,
             long[] stateTimestamps,
             JobStatus state,
+            @Nullable JobType jobType,
             @Nullable ErrorInfo failureCause,
             String jsonPlan,
             StringifiedAccumulatorResult[] archivedUserAccumulators,
@@ -134,6 +139,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
         this.verticesInCreationOrder = 
Preconditions.checkNotNull(verticesInCreationOrder);
         this.stateTimestamps = Preconditions.checkNotNull(stateTimestamps);
         this.state = Preconditions.checkNotNull(state);
+        this.jobType = jobType;
         this.failureCause = failureCause;
         this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
         this.archivedUserAccumulators = 
Preconditions.checkNotNull(archivedUserAccumulators);
@@ -170,6 +176,11 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
         return state;
     }
 
+    @Override
+    public JobType getJobType() {
+        return jobType;
+    }
+
     @Nullable
     @Override
     public ErrorInfo getFailureInfo() {
@@ -342,6 +353,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                 archivedVerticesInCreationOrder,
                 timestamps,
                 statusOverride == null ? executionGraph.getState() : 
statusOverride,
+                executionGraph.getJobType(),
                 executionGraph.getFailureInfo(),
                 executionGraph.getJsonPlan(),
                 executionGraph.getAccumulatorResultsStringified(),
@@ -364,6 +376,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
             JobID jobId,
             String jobName,
             JobStatus jobStatus,
+            @Nullable JobType jobType,
             @Nullable Throwable throwable,
             @Nullable JobCheckpointingSettings checkpointingSettings,
             long initializationTimestamp) {
@@ -371,6 +384,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                 jobId,
                 jobName,
                 jobStatus,
+                jobType,
                 Collections.emptyMap(),
                 Collections.emptyList(),
                 throwable,
@@ -382,6 +396,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
             JobID jobId,
             String jobName,
             JobStatus jobStatus,
+            JobType jobType,
             @Nullable Throwable throwable,
             @Nullable JobCheckpointingSettings checkpointingSettings,
             long initializationTimestamp,
@@ -411,6 +426,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                 jobId,
                 jobName,
                 jobStatus,
+                jobType,
                 archivedJobVertices,
                 archivedVerticesInCreationOrder,
                 throwable,
@@ -422,6 +438,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
             JobID jobId,
             String jobName,
             JobStatus jobStatus,
+            JobType jobType,
             Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks,
             List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder,
             @Nullable Throwable throwable,
@@ -453,6 +470,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                 archivedVerticesInCreationOrder,
                 timestamps,
                 jobStatus,
+                jobType,
                 failureInfo,
                 jsonPlan,
                 archivedUserAccumulators,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 1a853d851b6..0fb840a25a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -56,6 +56,7 @@ import 
org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertex.FinalizeOnMasterContext;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -220,6 +221,9 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
     /** Current status of the job execution. */
     private volatile JobStatus state = JobStatus.CREATED;
 
+    /** The job type of the job execution. */
+    private final JobType jobType;
+
     /** A future that completes once the job has reached a terminal state. */
     private final CompletableFuture<JobStatus> terminationFuture = new 
CompletableFuture<>();
 
@@ -303,6 +307,7 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
     // 
--------------------------------------------------------------------------------------------
 
     public DefaultExecutionGraph(
+            JobType jobType,
             JobInformation jobInformation,
             ScheduledExecutorService futureExecutor,
             Executor ioExecutor,
@@ -324,6 +329,7 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
             MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
             TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory) {
 
+        this.jobType = jobType;
         this.executionGraphId = new ExecutionGraphID();
 
         this.jobInformation = checkNotNull(jobInformation);
@@ -636,6 +642,11 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
         return state;
     }
 
+    @Override
+    public JobType getJobType() {
+        return jobType;
+    }
+
     @Override
     public Throwable getFailureCause() {
         return failureCause;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index a31077ffee4..7e41af896df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -141,6 +141,7 @@ public class DefaultExecutionGraphBuilder {
         // create a new execution graph, if none exists so far
         final DefaultExecutionGraph executionGraph =
                 new DefaultExecutionGraph(
+                        jobGraph.getJobType(),
                         jobInformation,
                         futureExecutor,
                         ioExecutor,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java
index cbd7920e69d..50fade80fa7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.factories;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess;
 import org.apache.flink.runtime.jobmaster.JobMasterServiceProcess;
@@ -33,6 +34,7 @@ public class DefaultJobMasterServiceProcessFactory implements 
JobMasterServicePr
 
     private final JobID jobId;
     private final String jobName;
+    private final JobType jobType;
     @Nullable private final JobCheckpointingSettings checkpointingSettings;
     private final long initializationTimestamp;
 
@@ -41,11 +43,13 @@ public class DefaultJobMasterServiceProcessFactory 
implements JobMasterServicePr
     public DefaultJobMasterServiceProcessFactory(
             JobID jobId,
             String jobName,
+            JobType jobType,
             @Nullable JobCheckpointingSettings checkpointingSettings,
             long initializationTimestamp,
             JobMasterServiceFactory jobMasterServiceFactory) {
         this.jobId = jobId;
         this.jobName = jobName;
+        this.jobType = jobType;
         this.checkpointingSettings = checkpointingSettings;
         this.initializationTimestamp = initializationTimestamp;
         this.jobMasterServiceFactory = jobMasterServiceFactory;
@@ -69,6 +73,12 @@ public class DefaultJobMasterServiceProcessFactory 
implements JobMasterServicePr
     public ArchivedExecutionGraph createArchivedExecutionGraph(
             JobStatus jobStatus, @Nullable Throwable cause) {
         return ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                jobId, jobName, jobStatus, cause, checkpointingSettings, 
initializationTimestamp);
+                jobId,
+                jobName,
+                jobStatus,
+                jobType,
+                cause,
+                checkpointingSettings,
+                initializationTimestamp);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 4c6d41d271a..3b13597ee01 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -999,6 +999,7 @@ public class AdaptiveScheduler
                 jobInformation.getJobID(),
                 jobInformation.getName(),
                 jobStatus,
+                jobGraph.getJobType(),
                 cause,
                 jobInformation.getCheckpointingSettings(),
                 initializationTimestamp,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 122441459d8..bd7f18f3ba9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -769,6 +769,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                                                                             
JobStatus.RUNNING,
                                                                             
null,
                                                                             
null,
+                                                                            
null,
                                                                             
1337))))
                             .build();
             
testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index e7bfc30ab17..cf8db60cb8a 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
@@ -600,6 +601,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
                                         jobId,
                                         jobGraph.getName(),
                                         JobStatus.FAILED,
+                                        null,
                                         testFailure,
                                         jobGraph.getCheckpointingSettings(),
                                         1L)),
@@ -852,6 +854,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
                                                                 jobId,
                                                                 
jobGraph.getName(),
                                                                 
JobStatus.FAILED,
+                                                                null,
                                                                 actualError,
                                                                 
jobGraph.getCheckpointingSettings(),
                                                                 1L)),
@@ -1598,6 +1601,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                                                         jobGraph.getJobID(),
                                                         jobGraph.getName(),
                                                         JobStatus.RUNNING,
+                                                        JobType.STREAMING,
                                                         null,
                                                         null,
                                                         
System.currentTimeMillis(),
@@ -1678,6 +1682,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                     new DefaultJobMasterServiceProcessFactory(
                             jobGraph.getJobID(),
                             jobGraph.getName(),
+                            jobGraph.getJobType(),
                             jobGraph.getCheckpointingSettings(),
                             initializationTimestamp,
                             jobMasterServiceFactory),
@@ -1743,6 +1748,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                     new DefaultJobMasterServiceProcessFactory(
                             jobGraph.getJobID(),
                             jobGraph.getName(),
+                            jobGraph.getJobType(),
                             jobGraph.getCheckpointingSettings(),
                             initializationTimestamp,
                             new TestingJobMasterServiceFactory(
@@ -1803,6 +1809,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                     new DefaultJobMasterServiceProcessFactory(
                             jobGraph.getJobID(),
                             jobGraph.getName(),
+                            jobGraph.getJobType(),
                             jobGraph.getCheckpointingSettings(),
                             initializationTimestamp,
                             new TestingJobMasterServiceFactory()),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 747e2764df7..f91b53a2c67 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -159,6 +160,7 @@ public class ArchivedExecutionGraphTest {
                         new JobID(),
                         "TestJob",
                         JobStatus.SUSPENDED,
+                        JobType.STREAMING,
                         new Exception("Test suspension exception"),
                         null,
                         System.currentTimeMillis());
@@ -177,6 +179,7 @@ public class ArchivedExecutionGraphTest {
                         new JobID(),
                         "TestJob",
                         JobStatus.INITIALIZING,
+                        JobType.STREAMING,
                         null,
                         new 
JobCheckpointingSettings(checkpointCoordinatorConfiguration, null),
                         System.currentTimeMillis());
@@ -210,6 +213,7 @@ public class ArchivedExecutionGraphTest {
                         new JobID(),
                         "TestJob",
                         JobStatus.INITIALIZING,
+                        JobType.STREAMING,
                         null,
                         null,
                         System.currentTimeMillis(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java
index 792dd085f31..9d1addca44c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java
@@ -51,7 +51,7 @@ class DefaultJobMasterServiceProcessTest {
             failedArchivedExecutionGraphFactory =
                     (throwable ->
                             
ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                                    jobId, "test", JobStatus.FAILED, 
throwable, null, 1337));
+                                    jobId, "test", JobStatus.FAILED, null, 
throwable, null, 1337));
 
     @Test
     void testInitializationFailureCompletesResultFuture() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index ee5cbcecc1a..20fa28e3e4e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -253,6 +253,7 @@ class JobMasterServiceLeadershipRunnerTest {
                         jobGraph.getJobID(),
                         jobGraph.getName(),
                         JobStatus.FAILED,
+                        jobGraph.getJobType(),
                         testException,
                         null,
                         1L));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 24c345f7355..9bccd4324c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
@@ -70,7 +71,13 @@ public class TestingJobManagerRunner implements 
JobManagerRunner {
         final ExecutionGraphInfo suspendedExecutionGraphInfo =
                 new ExecutionGraphInfo(
                         
ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                                jobId, "TestJob", JobStatus.SUSPENDED, null, 
null, 0L),
+                                jobId,
+                                "TestJob",
+                                JobStatus.SUSPENDED,
+                                JobType.STREAMING,
+                                null,
+                                null,
+                                0L),
                         null);
         terminationFuture.whenComplete(
                 (ignored, ignoredThrowable) ->
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java
index 40aa709bf57..4fc02ae5f4f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.factories;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobmaster.JobMasterServiceProcess;
 import org.apache.flink.runtime.jobmaster.TestingJobMasterServiceProcess;
 
@@ -65,7 +66,7 @@ public class TestingJobMasterServiceProcessFactory implements 
JobMasterServicePr
     public ArchivedExecutionGraph createArchivedExecutionGraph(
             JobStatus jobStatus, @Nullable Throwable cause) {
         return ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                jobId, jobName, jobStatus, cause, null, 
initializationTimestamp);
+                jobId, jobName, jobStatus, JobType.STREAMING, cause, null, 
initializationTimestamp);
     }
 
     public static Builder newBuilder() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java
index 2f316be42b6..65be4e14577 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.factories;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -73,7 +74,13 @@ public class TestingJobMasterServiceProcessFactoryOld 
implements JobMasterServic
     public ArchivedExecutionGraph createArchivedExecutionGraph(
             JobStatus jobStatus, @Nullable Throwable cause) {
         return ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                jobId, "test-job", jobStatus, cause, null, 
System.currentTimeMillis());
+                jobId,
+                "test-job",
+                jobStatus,
+                JobType.STREAMING,
+                cause,
+                null,
+                System.currentTimeMillis());
     }
 
     public static class TestingFutureJobMasterServiceFactory implements 
JobMasterServiceFactory {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
index 32e567e4f65..58255a5b6dc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.Preconditions;
@@ -146,6 +147,7 @@ public class ArchivedExecutionGraphBuilder {
                         : new ArrayList<>(tasks.values()),
                 stateTimestamps != null ? stateTimestamps : new 
long[JobStatus.values().length],
                 state != null ? state : JobStatus.FINISHED,
+                JobType.STREAMING,
                 failureCause,
                 jsonPlan != null
                         ? jsonPlan
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java
index b743eedfe39..7e98868ee26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobType;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
@@ -40,11 +41,14 @@ public class ExecutionGraphInfoTest {
                         new JobID(),
                         "test job name",
                         JobStatus.FAILED,
+                        JobType.STREAMING,
                         new RuntimeException("Expected RuntimeException"),
                         null,
                         System.currentTimeMillis());
 
         final ExecutionGraphInfo executionGraphInfo = new 
ExecutionGraphInfo(executionGraph);
+        assertThat(executionGraphInfo.getArchivedExecutionGraph().getJobType())
+                .isEqualTo(JobType.STREAMING);
 
         final ErrorInfo failureInfo =
                 
executionGraphInfo.getArchivedExecutionGraph().getFailureInfo();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
index 99faf87068e..f035fb9d004 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.failover.ResultPartitionAvailabilityChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
@@ -88,6 +89,7 @@ class StateTrackingMockExecutionGraph implements 
ExecutionGraph {
             LoggerFactory.getLogger(StateTrackingMockExecutionGraph.class);
 
     private JobStatus state = JobStatus.INITIALIZING;
+    private JobType jobType = JobType.STREAMING;
     private final CompletableFuture<JobStatus> terminationFuture = new 
CompletableFuture<>();
     private final JobID jobId = new JobID();
     private static final ArchivedExecutionConfig archivedExecutionConfig =
@@ -124,6 +126,11 @@ class StateTrackingMockExecutionGraph implements 
ExecutionGraph {
         return state;
     }
 
+    @Override
+    public JobType getJobType() {
+        return jobType;
+    }
+
     @Override
     public CompletableFuture<JobStatus> getTerminationFuture() {
         return terminationFuture;


Reply via email to