This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 191b9dff2f3faf281a77e211c6ef47243d6a9e8d
Author: lamber-ken <[email protected]>
AuthorDate: Wed Apr 24 17:36:28 2019 +0800

    [FLINK-12247][rest] Fix NPE when writing the archive json file to FileSystem
    
    This closes #8250.
---
 .../job/SubtaskExecutionAttemptAccumulatorsHandler.java  | 16 +++++++++-------
 .../job/SubtaskExecutionAttemptDetailsHandler.java       | 16 +++++++++-------
 2 files changed, 18 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
index 69688cb..97da25a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -100,13 +100,15 @@ public class SubtaskExecutionAttemptAccumulatorsHandler
 
                                for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
                                        AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
-                                       ResponseBody json = 
createAccumulatorInfo(attempt);
-                                       String path = 
getMessageHeaders().getTargetRestEndpointURL()
-                                               .replace(':' + 
JobIDPathParameter.KEY, graph.getJobID().toString())
-                                               .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
-                                               .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
-                                               .replace(':' + 
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
-                                       archive.add(new ArchivedJson(path, 
json));
+                                       if (attempt != null){
+                                               ResponseBody json = 
createAccumulatorInfo(attempt);
+                                               String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                                                       .replace(':' + 
JobIDPathParameter.KEY, graph.getJobID().toString())
+                                                       .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+                                                       .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                                       .replace(':' + 
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+                                               archive.add(new 
ArchivedJson(path, json));
+                                       }
                                }
                        }
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index c538606..75fd100 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -114,13 +114,15 @@ public class SubtaskExecutionAttemptDetailsHandler
 
                                for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
                                        AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
-                                       ResponseBody json = 
createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
-                                       String path = 
getMessageHeaders().getTargetRestEndpointURL()
-                                               .replace(':' + 
JobIDPathParameter.KEY, graph.getJobID().toString())
-                                               .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
-                                               .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
-                                               .replace(':' + 
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
-                                       archive.add(new ArchivedJson(path, 
json));
+                                       if (attempt != null) {
+                                               ResponseBody json = 
createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
+                                               String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                                                       .replace(':' + 
JobIDPathParameter.KEY, graph.getJobID().toString())
+                                                       .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+                                                       .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                                       .replace(':' + 
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+                                               archive.add(new 
ArchivedJson(path, json));
+                                       }
                                }
                        }
                }

Reply via email to