This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 767fe15 [FLINK-12247][rest] Fix NPE when writing the archive json
file to FileSystem
767fe15 is described below
commit 767fe152cb69a204261a0770412c8b28d037614d
Author: lamber-ken <[email protected]>
AuthorDate: Wed Apr 24 17:36:28 2019 +0800
[FLINK-12247][rest] Fix NPE when writing the archive json file to FileSystem
This closes #8250.
---
.../job/SubtaskExecutionAttemptAccumulatorsHandler.java | 16 +++++++++-------
.../job/SubtaskExecutionAttemptDetailsHandler.java | 16 +++++++++-------
2 files changed, 18 insertions(+), 14 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
index 69688cb..97da25a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -100,13 +100,15 @@ public class SubtaskExecutionAttemptAccumulatorsHandler
for (int x = 0; x <
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
AccessExecution attempt =
subtask.getPriorExecutionAttempt(x);
- ResponseBody json =
createAccumulatorInfo(attempt);
- String path =
getMessageHeaders().getTargetRestEndpointURL()
- .replace(':' +
JobIDPathParameter.KEY, graph.getJobID().toString())
- .replace(':' +
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
- .replace(':' +
SubtaskIndexPathParameter.KEY,
String.valueOf(subtask.getParallelSubtaskIndex()))
- .replace(':' +
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
- archive.add(new ArchivedJson(path,
json));
+ if (attempt != null){
+ ResponseBody json =
createAccumulatorInfo(attempt);
+ String path =
getMessageHeaders().getTargetRestEndpointURL()
+ .replace(':' +
JobIDPathParameter.KEY, graph.getJobID().toString())
+ .replace(':' +
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+ .replace(':' +
SubtaskIndexPathParameter.KEY,
String.valueOf(subtask.getParallelSubtaskIndex()))
+ .replace(':' +
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+ archive.add(new
ArchivedJson(path, json));
+ }
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index c538606..75fd100 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -114,13 +114,15 @@ public class SubtaskExecutionAttemptDetailsHandler
for (int x = 0; x <
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
AccessExecution attempt =
subtask.getPriorExecutionAttempt(x);
- ResponseBody json =
createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
- String path =
getMessageHeaders().getTargetRestEndpointURL()
- .replace(':' +
JobIDPathParameter.KEY, graph.getJobID().toString())
- .replace(':' +
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
- .replace(':' +
SubtaskIndexPathParameter.KEY,
String.valueOf(subtask.getParallelSubtaskIndex()))
- .replace(':' +
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
- archive.add(new ArchivedJson(path,
json));
+ if (attempt != null) {
+ ResponseBody json =
createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
+ String path =
getMessageHeaders().getTargetRestEndpointURL()
+ .replace(':' +
JobIDPathParameter.KEY, graph.getJobID().toString())
+ .replace(':' +
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+ .replace(':' +
SubtaskIndexPathParameter.KEY,
String.valueOf(subtask.getParallelSubtaskIndex()))
+ .replace(':' +
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+ archive.add(new
ArchivedJson(path, json));
+ }
}
}
}