This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b6e0074d7a5b34d998f71960bff7e4a6db45d7b0 Author: Matthias Pohl <[email protected]> AuthorDate: Sun Feb 28 17:45:29 2021 +0100 [FLINK-21190][runtime] Refactors JsonArchivist interface --- .../rest/handler/job/JobAccumulatorsHandler.java | 4 ++-- .../runtime/rest/handler/job/JobConfigHandler.java | 4 ++-- .../runtime/rest/handler/job/JobDetailsHandler.java | 4 ++-- .../rest/handler/job/JobExceptionsHandler.java | 4 ++-- .../runtime/rest/handler/job/JobPlanHandler.java | 4 ++-- .../rest/handler/job/JobVertexDetailsHandler.java | 4 ++-- .../handler/job/JobVertexTaskManagersHandler.java | 4 ++-- .../rest/handler/job/JobsOverviewHandler.java | 4 ++-- .../SubtaskExecutionAttemptAccumulatorsHandler.java | 4 ++-- .../job/SubtaskExecutionAttemptDetailsHandler.java | 4 ++-- .../rest/handler/job/SubtasksTimesHandler.java | 4 ++-- .../job/checkpoints/CheckpointConfigHandler.java | 4 ++-- .../CheckpointStatisticDetailsHandler.java | 4 ++-- .../checkpoints/CheckpointingStatisticsHandler.java | 4 ++-- .../TaskCheckpointStatisticDetailsHandler.java | 4 ++-- .../runtime/webmonitor/WebMonitorEndpoint.java | 6 +++--- .../runtime/webmonitor/history/JsonArchivist.java | 21 ++------------------- ...st.java => OnlyExecutionGraphJsonArchivist.java} | 21 ++++----------------- 18 files changed, 39 insertions(+), 69 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java index 70dd6ba..b1ace61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.SerializedValue; @@ -50,7 +50,7 @@ import java.util.concurrent.Executor; public class JobAccumulatorsHandler extends AbstractAccessExecutionGraphHandler< JobAccumulatorsInfo, JobAccumulatorsMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { public JobAccumulatorsHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java index 53ee2b6..18cc63c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import java.io.IOException; @@ -43,7 +43,7 @@ import java.util.concurrent.Executor; /** Handler serving the job configuration. */ public class JobConfigHandler extends AbstractAccessExecutionGraphHandler<JobConfigInfo, JobMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { public JobConfigHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index ffadb7d..8ef2e51 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -40,7 +40,7 @@ import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.Preconditions; @@ -57,7 +57,7 @@ import java.util.concurrent.Executor; /** Handler returning the details for the specified job. */ public class JobDetailsHandler extends AbstractAccessExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { private final MetricFetcher metricFetcher; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index 1fa496e..bb817df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import java.io.IOException; @@ -51,7 +51,7 @@ import java.util.concurrent.Executor; public class JobExceptionsHandler extends AbstractAccessExecutionGraphHandler< JobExceptionsInfo, JobExceptionsMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java index fcf48ba..0726fca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import java.io.IOException; @@ -42,7 +42,7 @@ import java.util.concurrent.Executor; /** Handler serving the job execution plan. */ public class JobPlanHandler extends AbstractAccessExecutionGraphHandler<JobPlanInfo, JobMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { public JobPlanHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java index 2c62314..7186704 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java @@ -39,7 +39,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import javax.annotation.Nullable; @@ -55,7 +55,7 @@ import java.util.concurrent.Executor; public class JobVertexDetailsHandler extends AbstractAccessExecutionGraphHandler< JobVertexDetailsInfo, JobVertexMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { private final MetricFetcher metricFetcher; public JobVertexDetailsHandler( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java index 1e38e29..672576f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java @@ -43,7 +43,7 @@ import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.Preconditions; @@ -64,7 +64,7 @@ import java.util.concurrent.Executor; public class JobVertexTaskManagersHandler extends AbstractAccessExecutionGraphHandler< JobVertexTaskManagersInfo, JobVertexMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { private MetricFetcher metricFetcher; public JobVertexTaskManagersHandler( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java index 5efd321..2448354 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import javax.annotation.Nonnull; @@ -47,7 +47,7 @@ import java.util.concurrent.CompletableFuture; public class JobsOverviewHandler extends AbstractRestHandler< RestfulGateway, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { public JobsOverviewHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java index 979d13b..3bf04ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -39,7 +39,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumul import org.apache.flink.runtime.rest.messages.job.UserAccumulator; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import java.io.IOException; @@ -53,7 +53,7 @@ import java.util.concurrent.Executor; public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptHandler< SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { /** * Instantiates a new Abstract job vertex handler. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java index bc85fc3..0f6370c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java @@ -40,7 +40,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.Preconditions; @@ -55,7 +55,7 @@ import java.util.concurrent.Executor; public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptHandler< SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { private final MetricFetcher metricFetcher; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java index e02611f..ff4507f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import java.io.IOException; @@ -49,7 +49,7 @@ import java.util.concurrent.Executor; /** Request handler for the subtasks times info. */ public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { public SubtasksTimesHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java index 7684271..7ca689c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java @@ -36,7 +36,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeader import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; @@ -50,7 +50,7 @@ import java.util.concurrent.Executor; /** Handler which serves the checkpoint configuration. */ public class CheckpointConfigHandler extends AbstractAccessExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { public CheckpointConfigHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java index 3b8dee0..37d7f81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParam import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import java.io.IOException; @@ -48,7 +48,7 @@ import java.util.concurrent.Executor; /** REST handler which returns the details for a checkpoint. */ public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics, CheckpointMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { public CheckpointStatisticDetailsHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java index db6f79c..e415ccd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java @@ -41,7 +41,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistic import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; @@ -57,7 +57,7 @@ import java.util.concurrent.Executor; /** Handler which serves the checkpoint statistics. */ public class CheckpointingStatisticsHandler extends AbstractAccessExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { public CheckpointingStatisticsHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java index d512c58..3681e65 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java @@ -43,7 +43,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointMessageP import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsWithSubtaskDetails; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import java.io.IOException; @@ -58,7 +58,7 @@ import java.util.concurrent.Executor; public class TaskCheckpointStatisticDetailsHandler extends AbstractCheckpointHandler< TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters> - implements JsonArchivist { + implements OnlyExecutionGraphJsonArchivist { public TaskCheckpointStatisticDetailsHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 4978426..e2b1783 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -24,7 +24,6 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.TransientBlobService; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -129,6 +128,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileH import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerThreadDumpHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -952,11 +952,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp } @Override - public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) + public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo) throws IOException { Collection<ArchivedJson> archivedJson = new ArrayList<>(archivingHandlers.size()); for (JsonArchivist archivist : archivingHandlers) { - Collection<ArchivedJson> subArchive = archivist.archiveJsonWithPath(graph); + Collection<ArchivedJson> subArchive = archivist.archiveJsonWithPath(executionGraphInfo); archivedJson.addAll(subArchive); } return archivedJson; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java index e1da5d0..c1b759b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java @@ -38,29 +38,12 @@ public interface JsonArchivist { * given job, for example one entry for each task. The REST URLs should be unique and must not * contain placeholders. * - * @param graph AccessExecutionGraph for which the responses should be generated - * @return Collection containing an ArchivedJson for every response that could be generated for - * the given job - * @throws IOException thrown if the JSON generation fails - */ - Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException; - - /** - * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their - * respective REST URL for a given job. - * - * <p>The collection should contain one entry for every response that could be generated for the - * given job, for example one entry for each task. The REST URLs should be unique and must not - * contain placeholders. - * * @param executionGraphInfo {@link AccessExecutionGraph}-related information for which the * responses should be generated * @return Collection containing an ArchivedJson for every response that could be generated for * the given job * @throws IOException thrown if the JSON generation fails */ - default Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo) - throws IOException { - return archiveJsonWithPath(executionGraphInfo.getArchivedExecutionGraph()); - } + Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo) + throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/OnlyExecutionGraphJsonArchivist.java similarity index 72% copy from flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/OnlyExecutionGraphJsonArchivist.java index e1da5d0..85c7d62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/OnlyExecutionGraphJsonArchivist.java @@ -25,10 +25,10 @@ import java.io.IOException; import java.util.Collection; /** - * Interface for all classes that want to participate in the archiving of job-related json - * responses. + * Interface for all classes that want to participate in the archiving of job-related json responses + * but only provide {@link AccessExecutionGraph}-related information. */ -public interface JsonArchivist { +public interface OnlyExecutionGraphJsonArchivist extends JsonArchivist { /** * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their @@ -45,20 +45,7 @@ public interface JsonArchivist { */ Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException; - /** - * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their - * respective REST URL for a given job. - * - * <p>The collection should contain one entry for every response that could be generated for the - * given job, for example one entry for each task. The REST URLs should be unique and must not - * contain placeholders. - * - * @param executionGraphInfo {@link AccessExecutionGraph}-related information for which the - * responses should be generated - * @return Collection containing an ArchivedJson for every response that could be generated for - * the given job - * @throws IOException thrown if the JSON generation fails - */ + @Override default Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo) throws IOException { return archiveJsonWithPath(executionGraphInfo.getArchivedExecutionGraph());
