[FLINK-5107] Handle evicted execution attempts in request handlers

If a prior execution attempt cannot be retrieved because it has been evicted 
before,
the request handler will now throw a meaningful exception to notify the 
requester
about the evicted execution attempt.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/871de0bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/871de0bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/871de0bf

Branch: refs/heads/release-1.1
Commit: 871de0bf7a28a79222406f73048432ab156c7b0f
Parents: 8989a9f
Author: Till Rohrmann <[email protected]>
Authored: Wed Nov 23 00:19:32 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Wed Nov 23 00:46:40 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 13 +++++++
 .../AbstractSubtaskAttemptRequestHandler.java   |  8 ++++-
 .../handlers/RequestHandlerException.java       | 31 ++++++++++++++++
 ...taskExecutionAttemptAccumulatorsHandler.java |  6 ----
 .../executiongraph/ExecutionJobVertex.java      |  8 +++--
 .../runtime/executiongraph/ExecutionVertex.java | 18 +++++-----
 .../runtime/jobmanager/JobManagerOptions.java   | 38 --------------------
 7 files changed, 66 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 1431eae..c70388a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -122,6 +122,13 @@ public final class ConfigConstants {
        public static final String JOB_MANAGER_IPC_PORT_KEY = 
"jobmanager.rpc.port";
 
        /**
+        * The config parameter defining the number of prior execution attempt 
information being stored
+        * on the job manager before the oldest execution attempt information 
is deleted.
+        */
+       @PublicEvolving
+       public static final String JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE = 
"jobmanager.max-attempts-history-size";
+
+       /**
         * The config parameter defining the network port to connect to
         * for communication with the resource manager.
         */
@@ -777,6 +784,12 @@ public final class ConfigConstants {
        public static final int DEFAULT_JOB_MANAGER_IPC_PORT = 6123;
 
        /**
+        * The default number of prior execution attempt information being 
stored on
+        * the job manager before the oldest information is deleted.
+        */
+       public static final int DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE = 
16;
+
+       /**
         * The default network port of the resource manager.
         */
        public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index 672df16..0f906fe 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -57,7 +57,13 @@ public abstract class AbstractSubtaskAttemptRequestHandler 
extends AbstractSubta
                }
                else if (attempt >= 0 && attempt < 
currentAttempt.getAttemptNumber()) {
                        Execution exec = 
vertex.getPriorExecutionAttempt(attempt);
-                       return handleRequest(exec, params);
+
+                       if (exec != null) {
+                               return handleRequest(exec, params);
+                       } else {
+                               throw new RequestHandlerException("Execution 
for attempt " + attempt +
+                                       " has already been deleted.");
+                       }
                }
                else {
                        throw new RuntimeException("Attempt does not exist: " + 
attempt);

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
new file mode 100644
index 0000000..bb61d16
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+/**
+ * Base class for request handler exceptions.
+ */
+public class RequestHandlerException extends Exception {
+
+       private static final long serialVersionUID = 7570352908725875886L;
+
+       public RequestHandlerException(String message) {
+               super(message);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index f661126..ade241a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -38,12 +38,6 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
 
        @Override
        public String handleRequest(Execution execAttempt, Map<String, String> 
params) throws Exception {
-
-               // return empty string for pruned (== null) execution attempts
-               if (null == execAttempt) {
-                       return "";
-               }
-
                final StringifiedAccumulatorResult[] accs = 
execAttempt.getUserAccumulatorsStringified();
                
                StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7af9868..47aaa45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.io.StrictlyLocalAssignment;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -38,7 +39,6 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -154,8 +154,10 @@ public class ExecutionJobVertex implements Serializable {
 
                Configuration jobConfiguration = graph.getJobConfiguration();
                int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
-                               
jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
-                               
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
+                               jobConfiguration.getInteger(
+                                       
ConfigConstants.JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE,
+                                       
ConfigConstants.DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE)
+                       : 
ConfigConstants.DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE;
 
                // create all task vertices
                for (int i = 0; i < numTaskVertices; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6e76d8f..b1e8475 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -36,7 +37,6 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -105,12 +105,12 @@ public class ExecutionVertex implements Serializable {
                        IntermediateResult[] producedDataSets,
                        FiniteDuration timeout) {
                this(
-                               jobVertex,
-                               subTaskIndex,
-                               producedDataSets,
-                               timeout,
-                               System.currentTimeMillis(),
-                               
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
+                       jobVertex,
+                       subTaskIndex,
+                       producedDataSets,
+                       timeout,
+                       System.currentTimeMillis(),
+                       
ConfigConstants.DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE);
        }
 
        public ExecutionVertex(
@@ -126,7 +126,7 @@ public class ExecutionVertex implements Serializable {
                        ExecutionJobVertex jobVertex,
                        int subTaskIndex,
                        IntermediateResult[] producedDataSets,
-                       Time timeout,
+                       FiniteDuration timeout,
                        long createTimestamp,
                        int maxPriorExecutionHistoryLength) {
                this.jobVertex = jobVertex;
@@ -254,6 +254,8 @@ public class ExecutionVertex implements Serializable {
                synchronized (priorExecutions) {
                        return new EvictingBoundedList<>(priorExecutions);
                }
+       }
+
        public ExecutionGraph getExecutionGraph() {
                return this.jobVertex.getGraph();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
deleted file mode 100644
index 279a70e..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.ConfigOption;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-@PublicEvolving
-public class JobManagerOptions {
-
-       /**
-        * The maximum number of prior execution attempts kept in history.
-        */
-       public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
-                       
key("job-manager.max-attempts-history-size").defaultValue(16);
-
-       private JobManagerOptions() {
-               throw new IllegalAccessError();
-       }
-}

Reply via email to