This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 191b9dff2f3faf281a77e211c6ef47243d6a9e8d Author: lamber-ken <[email protected]> AuthorDate: Wed Apr 24 17:36:28 2019 +0800 [FLINK-12247][rest] Fix NPE when writing the archive json file to FileSystem This closes #8250. --- .../job/SubtaskExecutionAttemptAccumulatorsHandler.java | 16 +++++++++------- .../job/SubtaskExecutionAttemptDetailsHandler.java | 16 +++++++++------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java index 69688cb..97da25a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -100,13 +100,15 @@ public class SubtaskExecutionAttemptAccumulatorsHandler for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) { AccessExecution attempt = subtask.getPriorExecutionAttempt(x); - ResponseBody json = createAccumulatorInfo(attempt); - String path = getMessageHeaders().getTargetRestEndpointURL() - .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) - .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()) - .replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex())) - .replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber())); - archive.add(new ArchivedJson(path, json)); + if (attempt != null){ + ResponseBody json = createAccumulatorInfo(attempt); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()) + .replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber())); + archive.add(new ArchivedJson(path, json)); + } } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java index c538606..75fd100 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java @@ -114,13 +114,15 @@ public class SubtaskExecutionAttemptDetailsHandler for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) { AccessExecution attempt = subtask.getPriorExecutionAttempt(x); - ResponseBody json = createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null); - String path = getMessageHeaders().getTargetRestEndpointURL() - .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) - .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()) - .replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex())) - .replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber())); - archive.add(new ArchivedJson(path, json)); + if (attempt != null) { + ResponseBody json = createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()) + .replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber())); + archive.add(new ArchivedJson(path, json)); + } } } }
