This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a34563  [FLINK-12246][runtime] Read MAX_ATTEMPTS_HISTORY_SIZE from 
cluster configuration
9a34563 is described below

commit 9a345633dab41906239f21bc2c27ad3c5b6f16df
Author: dcadmin <[email protected]>
AuthorDate: Fri May 3 16:37:36 2019 +0800

    [FLINK-12246][runtime] Read MAX_ATTEMPTS_HISTORY_SIZE from cluster 
configuration
    
    This closes #8268.
---
 .../runtime/executiongraph/ExecutionGraph.java     | 46 +++++++++++++++++++---
 .../executiongraph/ExecutionGraphBuilder.java      |  5 +++
 .../runtime/executiongraph/ExecutionJobVertex.java | 24 +++++------
 3 files changed, 58 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1b839d6..6a4af57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
@@ -255,6 +256,9 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
         * from results than need to be materialized. */
        private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
+       /** The maximum number of prior execution attempts kept in history. */
+       private final int maxPriorAttemptsHistoryLength;
+
        // ------ Execution status and progress. These values are volatile, and 
accessed under the lock -------
 
        private final AtomicInteger verticesFinished;
@@ -373,12 +377,39 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                        timeout);
        }
 
+       @VisibleForTesting
+       public ExecutionGraph(
+                       JobInformation jobInformation,
+                       ScheduledExecutorService futureExecutor,
+                       Executor ioExecutor,
+                       Time timeout,
+                       RestartStrategy restartStrategy,
+                       FailoverStrategy.Factory failoverStrategy,
+                       SlotProvider slotProvider,
+                       ClassLoader userClassLoader,
+                       BlobWriter blobWriter,
+                       Time allocationTimeout) throws IOException {
+               this(
+                       jobInformation,
+                       futureExecutor,
+                       ioExecutor,
+                       timeout,
+                       restartStrategy,
+                       
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue(),
+                       failoverStrategy,
+                       slotProvider,
+                       userClassLoader,
+                       blobWriter,
+                       allocationTimeout);
+       }
+
        public ExecutionGraph(
                        JobInformation jobInformation,
                        ScheduledExecutorService futureExecutor,
                        Executor ioExecutor,
                        Time rpcTimeout,
                        RestartStrategy restartStrategy,
+                       int maxPriorAttemptsHistoryLength,
                        FailoverStrategy.Factory failoverStrategyFactory,
                        SlotProvider slotProvider,
                        ClassLoader userClassLoader,
@@ -423,6 +454,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                // is ready by the time the failover strategy sees it
                this.failoverStrategy = 
checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
 
+               this.maxPriorAttemptsHistoryLength = 
maxPriorAttemptsHistoryLength;
+
                this.schedulingFuture = null;
                this.jobMasterMainThreadExecutor =
                        new 
ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
@@ -825,12 +858,13 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
 
                        // create the execution job vertex and attach it to the 
graph
                        ExecutionJobVertex ejv = new ExecutionJobVertex(
-                               this,
-                               jobVertex,
-                               1,
-                               rpcTimeout,
-                               globalModVersion,
-                               createTimestamp);
+                                       this,
+                                       jobVertex,
+                                       1,
+                                       maxPriorAttemptsHistoryLength,
+                                       rpcTimeout,
+                                       globalModVersion,
+                                       createTimestamp);
 
                        ejv.connectToPredecessors(this.intermediateResults);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ac4a759..fc63e4b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
@@ -110,6 +111,9 @@ public class ExecutionGraphBuilder {
                        jobGraph.getUserJarBlobKeys(),
                        jobGraph.getClasspaths());
 
+               final int maxPriorAttemptsHistoryLength =
+                               
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
+
                // create a new execution graph, if none exists so far
                final ExecutionGraph executionGraph;
                try {
@@ -120,6 +124,7 @@ public class ExecutionGraphBuilder {
                                        ioExecutor,
                                        rpcTimeout,
                                        restartStrategy,
+                                       maxPriorAttemptsHistoryLength,
                                        failoverStrategy,
                                        slotProvider,
                                        classLoader,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6b1887c..9254ba2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -141,18 +140,26 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
         */
        @VisibleForTesting
        ExecutionJobVertex(
-               ExecutionGraph graph,
-               JobVertex jobVertex,
-               int defaultParallelism,
-               Time timeout) throws JobException {
+                       ExecutionGraph graph,
+                       JobVertex jobVertex,
+                       int defaultParallelism,
+                       Time timeout) throws JobException {
 
-               this(graph, jobVertex, defaultParallelism, timeout, 1L, 
System.currentTimeMillis());
+               this(
+                       graph,
+                       jobVertex,
+                       defaultParallelism,
+                       
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue(),
+                       timeout,
+                       1L,
+                       System.currentTimeMillis());
        }
 
        public ExecutionJobVertex(
                        ExecutionGraph graph,
                        JobVertex jobVertex,
                        int defaultParallelism,
+                       int maxPriorAttemptsHistoryLength,
                        Time timeout,
                        long initialGlobalModVersion,
                        long createTimestamp) throws JobException {
@@ -214,11 +221,6 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                                        result.getResultType());
                }
 
-               Configuration jobConfiguration = graph.getJobConfiguration();
-               int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
-                               
jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
-                               
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
-
                // create all task vertices
                for (int i = 0; i < numTaskVertices; i++) {
                        ExecutionVertex vertex = new ExecutionVertex(

Reply via email to