[FLINK-7703] Port JobExceptionsHandler to new REST endpoint This closes #4834.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb528a11 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb528a11 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb528a11 Branch: refs/heads/master Commit: cb528a114b4f4bac04620f0dd6aeead773de0d0e Parents: e14f2db Author: zjureel <[email protected]> Authored: Wed Oct 18 11:39:49 2017 +0800 Committer: zentol <[email protected]> Committed: Wed Oct 18 12:51:00 2017 +0200 ---------------------------------------------------------------------- .../dispatcher/DispatcherRestEndpoint.java | 11 ++ .../rest/handler/job/JobExceptionsHandler.java | 100 +++++++++++++ .../rest/messages/JobExceptionsHeaders.java | 70 +++++++++ .../rest/messages/JobExceptionsInfo.java | 141 +++++++++++++++++++ .../messages/JobExceptionsInfoNoRootTest.java | 52 +++++++ .../rest/messages/JobExceptionsInfoTest.java | 52 +++++++ 6 files changed, 426 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index c23bb98..ac4897b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler; import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; +import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler; @@ -53,6 +54,7 @@ import org.apache.flink.runtime.rest.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.JobConfigHeaders; +import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobPlanHeaders; import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; @@ -207,6 +209,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { executor, checkpointStatsCache); + JobExceptionsHandler jobExceptionsHandler = new JobExceptionsHandler( + restAddressFuture, + leaderRetriever, + timeout, + JobExceptionsHeaders.getInstance(), + executionGraphCache, + executor); + final File tmpDir = restConfiguration.getTmpDir(); Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent; @@ -233,6 +243,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler)); handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler)); handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler)); + handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler)); BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout); handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler)); http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java new file mode 100644 index 0000000..feabbea --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfo; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Handler serving the job exceptions. + */ +public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> { + + static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + public JobExceptionsHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + MessageHeaders<EmptyRequestBody, JobExceptionsInfo, JobMessageParameters> messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor) { + + super( + localRestAddress, + leaderRetriever, + timeout, + messageHeaders, + executionGraphCache, + executor); + } + + @Override + protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) { + ErrorInfo rootException = executionGraph.getFailureCause(); + String rootExceptionMessage = null; + Long rootTimestamp = null; + if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { + rootExceptionMessage = rootException.getExceptionAsString(); + rootTimestamp = rootException.getTimestamp(); + } + + List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new ArrayList<>(); + boolean truncated = false; + for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) { + String t = task.getFailureCauseAsString(); + if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { + if (taskExceptionList.size() >= MAX_NUMBER_EXCEPTION_TO_REPORT) { + truncated = true; + break; + } + + TaskManagerLocation location = task.getCurrentAssignedResourceLocation(); + String locationString = location != null ? + location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)"; + long timestamp = task.getStateTimestamp(ExecutionState.FAILED); + taskExceptionList.add(new JobExceptionsInfo.ExecutionExceptionInfo( + t, + task.getTaskNameWithSubtaskIndex(), + locationString, + timestamp == 0 ? -1 : timestamp)); + } + } + + return new JobExceptionsInfo(rootExceptionMessage, rootTimestamp, taskExceptionList, truncated); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java new file mode 100644 index 0000000..7b924b3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link JobExceptionsHandler}. + */ +public class JobExceptionsHeaders implements MessageHeaders<EmptyRequestBody, JobExceptionsInfo, JobMessageParameters> { + + private static final JobExceptionsHeaders INSTANCE = new JobExceptionsHeaders(); + + public static final String URL = "/jobs/:jobid/exceptions"; + + private JobExceptionsHeaders() {} + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<JobExceptionsInfo> getResponseClass() { + return JobExceptionsInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static JobExceptionsHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java new file mode 100644 index 0000000..83b1134 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobExceptionsHandler}. + */ +public class JobExceptionsInfo implements ResponseBody { + + public static final String FIELD_NAME_ROOT_EXCEPTION = "root-exception"; + public static final String FIELD_NAME_TIMESTAMP = "timestamp"; + public static final String FIELD_NAME_ALL_EXCEPTIONS = "all-exceptions"; + public static final String FIELD_NAME_TRUNCATED = "truncated"; + + @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) + private final String rootException; + + @JsonProperty(FIELD_NAME_TIMESTAMP) + private final Long rootTimestamp; + + @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) + private final List<ExecutionExceptionInfo> allExceptions; + + @JsonProperty(FIELD_NAME_TRUNCATED) + private final boolean truncated; + + @JsonCreator + public JobExceptionsInfo( + @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException, + @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp, + @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) List<ExecutionExceptionInfo> allExceptions, + @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) { + this.rootException = rootException; + this.rootTimestamp = rootTimestamp; + this.allExceptions = Preconditions.checkNotNull(allExceptions); + this.truncated = truncated; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobExceptionsInfo that = (JobExceptionsInfo) o; + return truncated == that.truncated && + Objects.equals(rootException, that.rootException) && + Objects.equals(rootTimestamp, that.rootTimestamp) && + Objects.equals(allExceptions, that.allExceptions); + } + + @Override + public int hashCode() { + return Objects.hash(rootException, rootTimestamp, allExceptions, truncated); + } + + //--------------------------------------------------------------------------------- + // Static helper classes + //--------------------------------------------------------------------------------- + + /** + * Nested class to encapsulate the task execution exception. + */ + public static final class ExecutionExceptionInfo { + public static final String FIELD_NAME_EXCEPTION = "exception"; + public static final String FIELD_NAME_TASK = "task"; + public static final String FIELD_NAME_LOCATION = "location"; + public static final String FIELD_NAME_TIMESTAMP = "timestamp"; + + @JsonProperty(FIELD_NAME_EXCEPTION) + private final String exception; + + @JsonProperty(FIELD_NAME_TASK) + private final String task; + + @JsonProperty(FIELD_NAME_LOCATION) + private final String location; + + @JsonProperty(FIELD_NAME_TIMESTAMP) + private final long timestamp; + + @JsonCreator + public ExecutionExceptionInfo( + @JsonProperty(FIELD_NAME_EXCEPTION) String exception, + @JsonProperty(FIELD_NAME_TASK) String task, + @JsonProperty(FIELD_NAME_LOCATION) String location, + @JsonProperty(FIELD_NAME_TIMESTAMP) long timestamp) { + this.exception = Preconditions.checkNotNull(exception); + this.task = Preconditions.checkNotNull(task); + this.location = Preconditions.checkNotNull(location); + this.timestamp = timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobExceptionsInfo.ExecutionExceptionInfo that = (JobExceptionsInfo.ExecutionExceptionInfo) o; + return timestamp == that.timestamp && + Objects.equals(exception, that.exception) && + Objects.equals(task, that.task) && + Objects.equals(location, that.location); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, exception, task, location); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java new file mode 100644 index 0000000..a3c23f0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests that the {@link JobExceptionsInfo} with no root exception can be marshalled and unmarshalled. + */ +public class JobExceptionsInfoNoRootTest extends RestResponseMarshallingTestBase<JobExceptionsInfo> { + @Override + protected Class<JobExceptionsInfo> getTestResponseClass() { + return JobExceptionsInfo.class; + } + + @Override + protected JobExceptionsInfo getTestResponseInstance() throws Exception { + List<JobExceptionsInfo.ExecutionExceptionInfo> executionTaskExceptionInfoList = new ArrayList<>(); + executionTaskExceptionInfoList.add(new JobExceptionsInfo.ExecutionExceptionInfo( + "exception1", + "task1", + "location1", + System.currentTimeMillis())); + executionTaskExceptionInfoList.add(new JobExceptionsInfo.ExecutionExceptionInfo( + "exception2", + "task2", + "location2", + System.currentTimeMillis())); + return new JobExceptionsInfo( + null, + null, + executionTaskExceptionInfoList, + false); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java new file mode 100644 index 0000000..b8f3baa --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests that the {@link JobExceptionsInfo} can be marshalled and unmarshalled. + */ +public class JobExceptionsInfoTest extends RestResponseMarshallingTestBase<JobExceptionsInfo> { + @Override + protected Class<JobExceptionsInfo> getTestResponseClass() { + return JobExceptionsInfo.class; + } + + @Override + protected JobExceptionsInfo getTestResponseInstance() throws Exception { + List<JobExceptionsInfo.ExecutionExceptionInfo> executionTaskExceptionInfoList = new ArrayList<>(); + executionTaskExceptionInfoList.add(new JobExceptionsInfo.ExecutionExceptionInfo( + "exception1", + "task1", + "location1", + System.currentTimeMillis())); + executionTaskExceptionInfoList.add(new JobExceptionsInfo.ExecutionExceptionInfo( + "exception2", + "task2", + "location2", + System.currentTimeMillis())); + return new JobExceptionsInfo( + "root exception", + System.currentTimeMillis(), + executionTaskExceptionInfoList, + false); + } +}
