This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new 8140395  [FLINK-12247][rest] Fix NPE when writing the archive json 
file to FileSystem
8140395 is described below

commit 814039588128b1f0f546573339cb218caa32ee9a
Author: lamber-ken <[email protected]>
AuthorDate: Wed Apr 24 17:36:28 2019 +0800

    [FLINK-12247][rest] Fix NPE when writing the archive json file to FileSystem
    
    This closes #8250.
---
 .../job/SubtaskExecutionAttemptAccumulatorsHandler.java  | 16 +++++++++-------
 .../job/SubtaskExecutionAttemptDetailsHandler.java       | 16 +++++++++-------
 2 files changed, 18 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
index e335238..87d0dab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -103,13 +103,15 @@ public class SubtaskExecutionAttemptAccumulatorsHandler
 
                                for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
                                        AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
-                                       ResponseBody json = 
createAccumulatorInfo(attempt);
-                                       String path = 
getMessageHeaders().getTargetRestEndpointURL()
-                                               .replace(':' + 
JobIDPathParameter.KEY, graph.getJobID().toString())
-                                               .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
-                                               .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
-                                               .replace(':' + 
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
-                                       archive.add(new ArchivedJson(path, 
json));
+                                       if (attempt != null){
+                                               ResponseBody json = 
createAccumulatorInfo(attempt);
+                                               String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                                                       .replace(':' + 
JobIDPathParameter.KEY, graph.getJobID().toString())
+                                                       .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+                                                       .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                                       .replace(':' + 
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+                                               archive.add(new 
ArchivedJson(path, json));
+                                       }
                                }
                        }
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index bae80c7..a73dbc0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -117,13 +117,15 @@ public class SubtaskExecutionAttemptDetailsHandler
 
                                for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
                                        AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
-                                       ResponseBody json = 
createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
-                                       String path = 
getMessageHeaders().getTargetRestEndpointURL()
-                                               .replace(':' + 
JobIDPathParameter.KEY, graph.getJobID().toString())
-                                               .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
-                                               .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
-                                               .replace(':' + 
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
-                                       archive.add(new ArchivedJson(path, 
json));
+                                       if (attempt != null) {
+                                               ResponseBody json = 
createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
+                                               String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                                                       .replace(':' + 
JobIDPathParameter.KEY, graph.getJobID().toString())
+                                                       .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+                                                       .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                                       .replace(':' + 
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+                                               archive.add(new 
ArchivedJson(path, json));
+                                       }
                                }
                        }
                }

Reply via email to