[FLINK-5107] Handle evicted execution attempts in request handlers

If a prior execution attempt cannot be retrieved because it has been evicted 
before,
the request handler will now throw a meaningful exception to notify the 
requester
about the evicted execution attempt.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c23879a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c23879a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c23879a

Branch: refs/heads/master
Commit: 4c23879a5743b862b93d3c4267244235266163e7
Parents: f5af7f1
Author: Till Rohrmann <[email protected]>
Authored: Wed Nov 23 00:19:32 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Wed Nov 23 00:19:32 2016 +0100

----------------------------------------------------------------------
 .../AbstractSubtaskAttemptRequestHandler.java   |  8 ++++-
 .../handlers/RequestHandlerException.java       | 31 ++++++++++++++++++++
 ...taskExecutionAttemptAccumulatorsHandler.java |  6 ----
 3 files changed, 38 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c23879a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index f3a5059..1eab21c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -57,7 +57,13 @@ public abstract class AbstractSubtaskAttemptRequestHandler 
extends AbstractSubta
                }
                else if (attempt >= 0 && attempt < 
currentAttempt.getAttemptNumber()) {
                        AccessExecution exec = 
vertex.getPriorExecutionAttempt(attempt);
-                       return handleRequest(exec, params);
+
+                       if (exec != null) {
+                               return handleRequest(exec, params);
+                       } else {
+                               throw new RequestHandlerException("Execution 
for attempt " + attempt +
+                                       " has already been deleted.");
+                       }
                }
                else {
                        throw new RuntimeException("Attempt does not exist: " + 
attempt);

http://git-wip-us.apache.org/repos/asf/flink/blob/4c23879a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
new file mode 100644
index 0000000..bb61d16
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+/**
+ * Base class for request handler exceptions.
+ */
+public class RequestHandlerException extends Exception {
+
+       private static final long serialVersionUID = 7570352908725875886L;
+
+       public RequestHandlerException(String message) {
+               super(message);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4c23879a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 675304f..e613efb 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -38,12 +38,6 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
 
        @Override
        public String handleRequest(AccessExecution execAttempt, Map<String, 
String> params) throws Exception {
-
-               // return empty string for pruned (== null) execution attempts
-               if (null == execAttempt) {
-                       return "";
-               }
-
                final StringifiedAccumulatorResult[] accs = 
execAttempt.getUserAccumulatorsStringified();
                
                StringWriter writer = new StringWriter();

Reply via email to