This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5263f9cbf20536e9a81a0044f22b033e78a908ea Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Wed Apr 24 17:25:08 2024 +0800 [FLINK-35222][rest] Add getJobType for AccessExecutionGraph --- .../apache/flink/runtime/dispatcher/Dispatcher.java | 1 + .../JobMasterServiceLeadershipRunnerFactory.java | 1 + .../cleanup/CheckpointResourcesCleanupRunner.java | 1 + .../runtime/executiongraph/AccessExecutionGraph.java | 9 +++++++++ .../runtime/executiongraph/ArchivedExecutionGraph.java | 18 ++++++++++++++++++ .../runtime/executiongraph/DefaultExecutionGraph.java | 11 +++++++++++ .../executiongraph/DefaultExecutionGraphBuilder.java | 1 + .../DefaultJobMasterServiceProcessFactory.java | 12 +++++++++++- .../runtime/scheduler/adaptive/AdaptiveScheduler.java | 1 + .../dispatcher/DispatcherResourceCleanupTest.java | 1 + .../flink/runtime/dispatcher/DispatcherTest.java | 7 +++++++ .../executiongraph/ArchivedExecutionGraphTest.java | 4 ++++ .../jobmaster/DefaultJobMasterServiceProcessTest.java | 2 +- .../JobMasterServiceLeadershipRunnerTest.java | 1 + .../runtime/jobmaster/TestingJobManagerRunner.java | 9 ++++++++- .../TestingJobMasterServiceProcessFactory.java | 3 ++- .../TestingJobMasterServiceProcessFactoryOld.java | 9 ++++++++- .../legacy/utils/ArchivedExecutionGraphBuilder.java | 2 ++ .../runtime/scheduler/ExecutionGraphInfoTest.java | 4 ++++ .../adaptive/StateTrackingMockExecutionGraph.java | 7 +++++++ 20 files changed, 99 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 5b89203095e..e138147d97d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -563,6 +563,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> jobId, jobName, JobStatus.FAILED, + null, exception, null, System.currentTimeMillis()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java index bf7c69fb077..718db104ed9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java @@ -118,6 +118,7 @@ public enum JobMasterServiceLeadershipRunnerFactory implements JobManagerRunnerF new DefaultJobMasterServiceProcessFactory( jobGraph.getJobID(), jobGraph.getName(), + jobGraph.getJobType(), jobGraph.getCheckpointingSettings(), initializationTimestamp, jobMasterServiceFactory); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java index 9bd8a1b499e..59128d853b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java @@ -226,6 +226,7 @@ public class CheckpointResourcesCleanupRunner implements JobManagerRunner { jobResult.getJobId(), "unknown", getJobStatus(jobResult), + null, jobResult.getSerializedThrowable().orElse(null), null, initializationTimestamp)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java index 011e864a4fd..e9998b1a78f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.util.OptionalFailure; @@ -67,6 +68,14 @@ public interface AccessExecutionGraph extends JobStatusProvider { */ JobStatus getState(); + /** + * Returns the {@link JobType} for this execution graph. + * + * @return job type for this execution graph. It may be null when an exception occurs. + */ + @Nullable + JobType getJobType(); + /** * Returns the exception that caused the job to fail. This is the first root exception that was * not recoverable and triggered job failure. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index c0f25b870c6..3fb0a09fde1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; @@ -83,6 +84,9 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl /** Current status of the job execution. */ private final JobStatus state; + /** The job type of the job execution. */ + @Nullable private final JobType jobType; + /** * The exception that caused the job to fail. This is set to the first root exception that was * not recoverable and triggered job failure @@ -115,6 +119,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl List<ArchivedExecutionJobVertex> verticesInCreationOrder, long[] stateTimestamps, JobStatus state, + @Nullable JobType jobType, @Nullable ErrorInfo failureCause, String jsonPlan, StringifiedAccumulatorResult[] archivedUserAccumulators, @@ -134,6 +139,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl this.verticesInCreationOrder = Preconditions.checkNotNull(verticesInCreationOrder); this.stateTimestamps = Preconditions.checkNotNull(stateTimestamps); this.state = Preconditions.checkNotNull(state); + this.jobType = jobType; this.failureCause = failureCause; this.jsonPlan = Preconditions.checkNotNull(jsonPlan); this.archivedUserAccumulators = Preconditions.checkNotNull(archivedUserAccumulators); @@ -170,6 +176,11 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl return state; } + @Override + public JobType getJobType() { + return jobType; + } + @Nullable @Override public ErrorInfo getFailureInfo() { @@ -342,6 +353,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl archivedVerticesInCreationOrder, timestamps, statusOverride == null ? executionGraph.getState() : statusOverride, + executionGraph.getJobType(), executionGraph.getFailureInfo(), executionGraph.getJsonPlan(), executionGraph.getAccumulatorResultsStringified(), @@ -364,6 +376,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl JobID jobId, String jobName, JobStatus jobStatus, + @Nullable JobType jobType, @Nullable Throwable throwable, @Nullable JobCheckpointingSettings checkpointingSettings, long initializationTimestamp) { @@ -371,6 +384,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl jobId, jobName, jobStatus, + jobType, Collections.emptyMap(), Collections.emptyList(), throwable, @@ -382,6 +396,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl JobID jobId, String jobName, JobStatus jobStatus, + JobType jobType, @Nullable Throwable throwable, @Nullable JobCheckpointingSettings checkpointingSettings, long initializationTimestamp, @@ -411,6 +426,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl jobId, jobName, jobStatus, + jobType, archivedJobVertices, archivedVerticesInCreationOrder, throwable, @@ -422,6 +438,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl JobID jobId, String jobName, JobStatus jobStatus, + JobType jobType, Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks, List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder, @Nullable Throwable throwable, @@ -453,6 +470,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl archivedVerticesInCreationOrder, timestamps, jobStatus, + jobType, failureInfo, jsonPlan, archivedUserAccumulators, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 1a853d851b6..0fb840a25a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -56,6 +56,7 @@ import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex.FinalizeOnMasterContext; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -220,6 +221,9 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG /** Current status of the job execution. */ private volatile JobStatus state = JobStatus.CREATED; + /** The job type of the job execution. */ + private final JobType jobType; + /** A future that completes once the job has reached a terminal state. */ private final CompletableFuture<JobStatus> terminationFuture = new CompletableFuture<>(); @@ -303,6 +307,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG // -------------------------------------------------------------------------------------------- public DefaultExecutionGraph( + JobType jobType, JobInformation jobInformation, ScheduledExecutorService futureExecutor, Executor ioExecutor, @@ -324,6 +329,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG MarkPartitionFinishedStrategy markPartitionFinishedStrategy, TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory) { + this.jobType = jobType; this.executionGraphId = new ExecutionGraphID(); this.jobInformation = checkNotNull(jobInformation); @@ -636,6 +642,11 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG return state; } + @Override + public JobType getJobType() { + return jobType; + } + @Override public Throwable getFailureCause() { return failureCause; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java index a31077ffee4..7e41af896df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java @@ -141,6 +141,7 @@ public class DefaultExecutionGraphBuilder { // create a new execution graph, if none exists so far final DefaultExecutionGraph executionGraph = new DefaultExecutionGraph( + jobGraph.getJobType(), jobInformation, futureExecutor, ioExecutor, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java index cbd7920e69d..50fade80fa7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.factories; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess; import org.apache.flink.runtime.jobmaster.JobMasterServiceProcess; @@ -33,6 +34,7 @@ public class DefaultJobMasterServiceProcessFactory implements JobMasterServicePr private final JobID jobId; private final String jobName; + private final JobType jobType; @Nullable private final JobCheckpointingSettings checkpointingSettings; private final long initializationTimestamp; @@ -41,11 +43,13 @@ public class DefaultJobMasterServiceProcessFactory implements JobMasterServicePr public DefaultJobMasterServiceProcessFactory( JobID jobId, String jobName, + JobType jobType, @Nullable JobCheckpointingSettings checkpointingSettings, long initializationTimestamp, JobMasterServiceFactory jobMasterServiceFactory) { this.jobId = jobId; this.jobName = jobName; + this.jobType = jobType; this.checkpointingSettings = checkpointingSettings; this.initializationTimestamp = initializationTimestamp; this.jobMasterServiceFactory = jobMasterServiceFactory; @@ -69,6 +73,12 @@ public class DefaultJobMasterServiceProcessFactory implements JobMasterServicePr public ArchivedExecutionGraph createArchivedExecutionGraph( JobStatus jobStatus, @Nullable Throwable cause) { return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( - jobId, jobName, jobStatus, cause, checkpointingSettings, initializationTimestamp); + jobId, + jobName, + jobStatus, + jobType, + cause, + checkpointingSettings, + initializationTimestamp); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 4c6d41d271a..3b13597ee01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -999,6 +999,7 @@ public class AdaptiveScheduler jobInformation.getJobID(), jobInformation.getName(), jobStatus, + jobGraph.getJobType(), cause, jobInformation.getCheckpointingSettings(), initializationTimestamp, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index 122441459d8..bd7f18f3ba9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -769,6 +769,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { JobStatus.RUNNING, null, null, + null, 1337)))) .build(); testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index e7bfc30ab17..cf8db60cb8a 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobResourceRequirements; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobManagerRunner; @@ -600,6 +601,7 @@ public class DispatcherTest extends AbstractDispatcherTest { jobId, jobGraph.getName(), JobStatus.FAILED, + null, testFailure, jobGraph.getCheckpointingSettings(), 1L)), @@ -852,6 +854,7 @@ public class DispatcherTest extends AbstractDispatcherTest { jobId, jobGraph.getName(), JobStatus.FAILED, + null, actualError, jobGraph.getCheckpointingSettings(), 1L)), @@ -1598,6 +1601,7 @@ public class DispatcherTest extends AbstractDispatcherTest { jobGraph.getJobID(), jobGraph.getName(), JobStatus.RUNNING, + JobType.STREAMING, null, null, System.currentTimeMillis(), @@ -1678,6 +1682,7 @@ public class DispatcherTest extends AbstractDispatcherTest { new DefaultJobMasterServiceProcessFactory( jobGraph.getJobID(), jobGraph.getName(), + jobGraph.getJobType(), jobGraph.getCheckpointingSettings(), initializationTimestamp, jobMasterServiceFactory), @@ -1743,6 +1748,7 @@ public class DispatcherTest extends AbstractDispatcherTest { new DefaultJobMasterServiceProcessFactory( jobGraph.getJobID(), jobGraph.getName(), + jobGraph.getJobType(), jobGraph.getCheckpointingSettings(), initializationTimestamp, new TestingJobMasterServiceFactory( @@ -1803,6 +1809,7 @@ public class DispatcherTest extends AbstractDispatcherTest { new DefaultJobMasterServiceProcessFactory( jobGraph.getJobID(), jobGraph.getName(), + jobGraph.getJobType(), jobGraph.getCheckpointingSettings(), initializationTimestamp, new TestingJobMasterServiceFactory()), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 747e2764df7..f91b53a2c67 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -159,6 +160,7 @@ public class ArchivedExecutionGraphTest { new JobID(), "TestJob", JobStatus.SUSPENDED, + JobType.STREAMING, new Exception("Test suspension exception"), null, System.currentTimeMillis()); @@ -177,6 +179,7 @@ public class ArchivedExecutionGraphTest { new JobID(), "TestJob", JobStatus.INITIALIZING, + JobType.STREAMING, null, new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null), System.currentTimeMillis()); @@ -210,6 +213,7 @@ public class ArchivedExecutionGraphTest { new JobID(), "TestJob", JobStatus.INITIALIZING, + JobType.STREAMING, null, null, System.currentTimeMillis(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java index 792dd085f31..9d1addca44c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java @@ -51,7 +51,7 @@ class DefaultJobMasterServiceProcessTest { failedArchivedExecutionGraphFactory = (throwable -> ArchivedExecutionGraph.createSparseArchivedExecutionGraph( - jobId, "test", JobStatus.FAILED, throwable, null, 1337)); + jobId, "test", JobStatus.FAILED, null, throwable, null, 1337)); @Test void testInitializationFailureCompletesResultFuture() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index ee5cbcecc1a..20fa28e3e4e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -253,6 +253,7 @@ class JobMasterServiceLeadershipRunnerTest { jobGraph.getJobID(), jobGraph.getName(), JobStatus.FAILED, + jobGraph.getJobType(), testException, null, 1L)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java index 24c345f7355..9bccd4324c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; @@ -70,7 +71,13 @@ public class TestingJobManagerRunner implements JobManagerRunner { final ExecutionGraphInfo suspendedExecutionGraphInfo = new ExecutionGraphInfo( ArchivedExecutionGraph.createSparseArchivedExecutionGraph( - jobId, "TestJob", JobStatus.SUSPENDED, null, null, 0L), + jobId, + "TestJob", + JobStatus.SUSPENDED, + JobType.STREAMING, + null, + null, + 0L), null); terminationFuture.whenComplete( (ignored, ignoredThrowable) -> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java index 40aa709bf57..4fc02ae5f4f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.factories; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobmaster.JobMasterServiceProcess; import org.apache.flink.runtime.jobmaster.TestingJobMasterServiceProcess; @@ -65,7 +66,7 @@ public class TestingJobMasterServiceProcessFactory implements JobMasterServicePr public ArchivedExecutionGraph createArchivedExecutionGraph( JobStatus jobStatus, @Nullable Throwable cause) { return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( - jobId, jobName, jobStatus, cause, null, initializationTimestamp); + jobId, jobName, jobStatus, JobType.STREAMING, cause, null, initializationTimestamp); } public static Builder newBuilder() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java index 2f316be42b6..65be4e14577 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.factories; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess; import org.apache.flink.runtime.jobmaster.JobMaster; @@ -73,7 +74,13 @@ public class TestingJobMasterServiceProcessFactoryOld implements JobMasterServic public ArchivedExecutionGraph createArchivedExecutionGraph( JobStatus jobStatus, @Nullable Throwable cause) { return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( - jobId, "test-job", jobStatus, cause, null, System.currentTimeMillis()); + jobId, + "test-job", + jobStatus, + JobType.STREAMING, + cause, + null, + System.currentTimeMillis()); } public static class TestingFutureJobMasterServiceFactory implements JobMasterServiceFactory { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java index 32e567e4f65..58255a5b6dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.Preconditions; @@ -146,6 +147,7 @@ public class ArchivedExecutionGraphBuilder { : new ArrayList<>(tasks.values()), stateTimestamps != null ? stateTimestamps : new long[JobStatus.values().length], state != null ? state : JobStatus.FINISHED, + JobType.STREAMING, failureCause, jsonPlan != null ? jsonPlan diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java index b743eedfe39..7e98868ee26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables; @@ -40,11 +41,14 @@ public class ExecutionGraphInfoTest { new JobID(), "test job name", JobStatus.FAILED, + JobType.STREAMING, new RuntimeException("Expected RuntimeException"), null, System.currentTimeMillis()); final ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo(executionGraph); + assertThat(executionGraphInfo.getArchivedExecutionGraph().getJobType()) + .isEqualTo(JobType.STREAMING); final ErrorInfo failureInfo = executionGraphInfo.getArchivedExecutionGraph().getFailureInfo(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java index 99faf87068e..f035fb9d004 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.failover.ResultPartitionAvailabilityChecker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; @@ -88,6 +89,7 @@ class StateTrackingMockExecutionGraph implements ExecutionGraph { LoggerFactory.getLogger(StateTrackingMockExecutionGraph.class); private JobStatus state = JobStatus.INITIALIZING; + private JobType jobType = JobType.STREAMING; private final CompletableFuture<JobStatus> terminationFuture = new CompletableFuture<>(); private final JobID jobId = new JobID(); private static final ArchivedExecutionConfig archivedExecutionConfig = @@ -124,6 +126,11 @@ class StateTrackingMockExecutionGraph implements ExecutionGraph { return state; } + @Override + public JobType getJobType() { + return jobType; + } + @Override public CompletableFuture<JobStatus> getTerminationFuture() { return terminationFuture;