[FLINK-4776] [distributed coordination] Move ExecutionGraph initialization into 
the dedicated class ExecutionGraphBuilder


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05436f4b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05436f4b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05436f4b

Branch: refs/heads/flip-6
Commit: 05436f4b64e771b22f13f56ff9e0ea7aa94b4ff7
Parents: 21b9f16
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 7 19:58:24 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 13 16:25:49 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   4 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../executiongraph/ExecutionGraphBuilder.java   | 262 +++++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 168 ++----------
 4 files changed, 297 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05436f4b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 4428427..e95afe0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -171,7 +171,7 @@ public class CheckpointCoordinator {
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore completedCheckpointStore,
                        SavepointStore savepointStore,
-                       CheckpointStatsTracker statsTracker) throws Exception {
+                       CheckpointStatsTracker statsTracker) {
 
                // sanity checks
                checkArgument(baseInterval > 0, "Checkpoint timeout must be 
larger than zero");
@@ -207,7 +207,7 @@ public class CheckpointCoordinator {
                        // issues a blocking call to ZooKeeper.
                        checkpointIDCounter.start();
                } catch (Throwable t) {
-                       throw new Exception("Failed to start checkpoint ID 
counter: " + t.getMessage(), t);
+                       throw new RuntimeException("Failed to start checkpoint 
ID counter: " + t.getMessage(), t);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05436f4b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 6023205..cf98ca6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -359,7 +359,7 @@ public class ExecutionGraph {
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore checkpointStore,
                        SavepointStore savepointStore,
-                       CheckpointStatsTracker statsTracker) throws Exception {
+                       CheckpointStatsTracker statsTracker) {
 
                // simple sanity checks
                if (interval < 10 || checkpointTimeout < 10) {
@@ -374,7 +374,11 @@ public class ExecutionGraph {
                ExecutionVertex[] tasksToCommitTo = 
collectExecutionVertices(verticesToCommitTo);
 
                // disable to make sure existing checkpoint coordinators are 
cleared
-               disableSnaphotCheckpointing();
+               try {
+                       disableSnaphotCheckpointing();
+               } catch (Throwable t) {
+                       LOG.error("Error while shutting down checkpointer.");
+               }
 
                checkpointStatsTracker = Objects.requireNonNull(statsTracker, 
"Checkpoint stats tracker");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05436f4b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
new file mode 100644
index 0000000..1c6eb8d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class to encapsulate the logic of building an {@link 
ExecutionGraph} from a {@link JobGraph}.
+ */
+public class ExecutionGraphBuilder {
+
+       /**
+        * Builds the ExecutionGraph from the JobGraph.
+        * If a prior execution graph exists, the JobGraph will be attached. If 
no prior execution
+        * graph exists, then the JobGraph will become attach to a new emoty 
execution graph.
+        */
+       public static ExecutionGraph buildGraph(
+                       @Nullable ExecutionGraph prior,
+                       JobGraph jobGraph,
+                       Configuration jobManagerConfig,
+                       Executor executor,
+                       ClassLoader classLoader,
+                       CheckpointRecoveryFactory recoveryFactory,
+                       SavepointStore savepointStore,
+                       Time timeout,
+                       RestartStrategy restartStrategy,
+                       MetricGroup metrics,
+                       int parallelismForAutoMax,
+                       Logger log)
+                       throws JobExecutionException, JobException
+       {
+               final ExecutionContext executionContext = 
ExecutionContext$.MODULE$.fromExecutor(executor);
+               
+               return buildGraph(prior, jobGraph, jobManagerConfig, 
executionContext,
+                               classLoader, recoveryFactory, savepointStore, 
timeout, restartStrategy,
+                               metrics, parallelismForAutoMax, log);
+       }
+
+       /**
+        * Builds the ExecutionGraph from the JobGraph.
+        * If a prior execution graph exists, the JobGraph will be attached. If 
no prior execution
+        * graph exists, then the JobGraph will become attach to a new emoty 
execution graph.
+        */
+       public static ExecutionGraph buildGraph(
+                       @Nullable ExecutionGraph prior,
+                       JobGraph jobGraph,
+                       Configuration jobManagerConfig,
+                       ExecutionContext executionContext,
+                       ClassLoader classLoader,
+                       CheckpointRecoveryFactory recoveryFactory,
+                       SavepointStore savepointStore,
+                       Time timeout,
+                       RestartStrategy restartStrategy,
+                       MetricGroup metrics,
+                       int parallelismForAutoMax,
+                       Logger log)
+               throws JobExecutionException, JobException
+       {
+               checkNotNull(jobGraph, "job graph cannot be null");
+
+               final String jobName = jobGraph.getName();
+               final JobID jobId = jobGraph.getJobID();
+
+               // create a new execution graph, if none exists so far
+               final ExecutionGraph executionGraph = (prior != null) ? prior :
+                               new ExecutionGraph(
+                                               executionContext,
+                                               jobId,
+                                               jobName,
+                                               jobGraph.getJobConfiguration(),
+                                               
jobGraph.getSerializedExecutionConfig(),
+                                               new 
FiniteDuration(timeout.getSize(), timeout.getUnit()),
+                                               restartStrategy,
+                                               jobGraph.getUserJarBlobKeys(),
+                                               jobGraph.getClasspaths(),
+                                               classLoader,
+                                               metrics);
+
+               // set the basic properties
+
+               executionGraph.setScheduleMode(jobGraph.getScheduleMode());
+               
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
+
+               try {
+                       
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
+               }
+               catch (Throwable t) {
+                       log.warn("Cannot create JSON plan for job", t);
+                       // give the graph an empty plan
+                       executionGraph.setJsonPlan("{}");
+               }
+
+               // initialize the vertices that have a master initialization 
hook
+               // file output formats create directories here, input formats 
create splits
+
+               final long initMasterStart = System.nanoTime();
+               log.info("Running initialization on master for job {} ({}).", 
jobName, jobId);
+
+               for (JobVertex vertex : jobGraph.getVertices()) {
+                       String executableClass = vertex.getInvokableClassName();
+                       if (executableClass == null || 
executableClass.isEmpty()) {
+                               throw new JobSubmissionException(jobId,
+                                               "The vertex " + vertex.getID() 
+ " (" + vertex.getName() + ") has no invokable class.");
+                       }
+
+                       if (vertex.getParallelism() == 
ExecutionConfig.PARALLELISM_AUTO_MAX) {
+                               vertex.setParallelism(parallelismForAutoMax);
+                       }
+
+                       try {
+                               vertex.initializeOnMaster(classLoader);
+                       }
+                       catch (Throwable t) {
+                                       throw new JobExecutionException(jobId,
+                                                       "Cannot initialize task 
'" + vertex.getName() + "': " + t.getMessage(), t);
+                       }
+               }
+
+               log.info("Successfully ran initialization on master in {} ms.",
+                               (System.nanoTime() - initMasterStart) / 
1_000_000);
+
+               // topologically sort the job vertices and attach the graph to 
the existing one
+               List<JobVertex> sortedTopology = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               if (log.isDebugEnabled()) {
+                       log.debug("Adding {} vertices from job graph {} ({}).", 
sortedTopology.size(), jobName, jobId);
+               }
+               executionGraph.attachJobGraph(sortedTopology);
+
+               if (log.isDebugEnabled()) {
+                       log.debug("Successfully created execution graph from 
job graph {} ({}).", jobName, jobId);
+               }
+
+               // configure the state checkpointing
+               JobSnapshottingSettings snapshotSettings = 
jobGraph.getSnapshotSettings();
+               if (snapshotSettings != null) {
+
+                       List<ExecutionJobVertex> triggerVertices = 
+                                       
idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
+
+                       List<ExecutionJobVertex> ackVertices =
+                                       
idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
+
+                       List<ExecutionJobVertex> confirmVertices =
+                                       
idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
+
+                       CompletedCheckpointStore completedCheckpoints;
+                       CheckpointIDCounter checkpointIdCounter;
+                       try {
+                               completedCheckpoints = 
recoveryFactory.createCheckpointStore(jobId, classLoader);
+                               checkpointIdCounter = 
recoveryFactory.createCheckpointIDCounter(jobId);
+                       }
+                       catch (Exception e) {
+                               throw new JobExecutionException(jobId, "Failed 
to initialize high-availability checkpoint handler", e);
+                       }
+
+                       // Checkpoint stats tracker
+                       boolean isStatsDisabled = jobManagerConfig.getBoolean(
+                                       
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
+                                       
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
+
+                       CheckpointStatsTracker checkpointStatsTracker;
+                       if (isStatsDisabled) {
+                               checkpointStatsTracker = new 
DisabledCheckpointStatsTracker();
+                       }
+                       else {
+                               int historySize = jobManagerConfig.getInteger(
+                                               
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+                                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+
+                               checkpointStatsTracker = new 
SimpleCheckpointStatsTracker(historySize, ackVertices, metrics);
+                       }
+
+                       executionGraph.enableSnapshotCheckpointing(
+                                       
snapshotSettings.getCheckpointInterval(),
+                                       snapshotSettings.getCheckpointTimeout(),
+                                       
snapshotSettings.getMinPauseBetweenCheckpoints(),
+                                       
snapshotSettings.getMaxConcurrentCheckpoints(),
+                                       triggerVertices,
+                                       ackVertices,
+                                       confirmVertices,
+                                       checkpointIdCounter,
+                                       completedCheckpoints,
+                                       savepointStore,
+                                       checkpointStatsTracker);
+               }
+
+               return executionGraph;
+       }
+
+       private static List<ExecutionJobVertex> idToVertex(
+                       List<JobVertexID> jobVertices, ExecutionGraph 
executionGraph) throws IllegalArgumentException {
+
+               List<ExecutionJobVertex> result = new 
ArrayList<>(jobVertices.size());
+
+               for (JobVertexID id : jobVertices) {
+                       ExecutionJobVertex vertex = 
executionGraph.getJobVertex(id);
+                       if (vertex != null) {
+                               result.add(vertex);
+                       } else {
+                               throw new IllegalArgumentException(
+                                               "The snapshot checkpointing 
settings refer to non-existent vertex " + id);
+                       } 
+               }
+
+               return result;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /** This class is not supposed to be instantiated */
+       private ExecutionGraphBuilder() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05436f4b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 01f9cec..e90f2d2 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -29,7 +29,8 @@ import akka.actor.Status.{Failure, Success}
 import akka.actor._
 import akka.pattern.ask
 import grizzled.slf4j.Logger
-import org.apache.flink.api.common.{ExecutionConfig, JobID}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
@@ -49,11 +50,10 @@ import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionJobVertex, StatusListenerMessenger}
+import org.apache.flink.runtime.executiongraph.{ExecutionGraphBuilder, 
ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.io.network.PartitionState
-import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.{LeaderContender, 
LeaderElectionService, StandaloneLeaderElectionService}
@@ -1114,7 +1114,7 @@ class JobManager(
           Option(jobGraph.getSerializedExecutionConfig()
             .deserializeValue(userCodeLoader)
             .getRestartStrategy())
-            .map(RestartStrategyFactory.createRestartStrategy(_)) match {
+            .map(RestartStrategyFactory.createRestartStrategy) match {
             case Some(strategy) => strategy
             case None => restartStrategyFactory.createRestartStrategy()
           }
@@ -1131,148 +1131,34 @@ class JobManager(
             new UnregisteredMetricsGroup()
         }
 
+        val numSlots = scheduler.getTotalNumberOfSlots()
+
         // see if there already exists an ExecutionGraph for the corresponding 
job ID
-        executionGraph = currentJobs.get(jobGraph.getJobID) match {
+        val registerNewGraph = currentJobs.get(jobGraph.getJobID) match {
           case Some((graph, currentJobInfo)) =>
+            executionGraph = graph
             currentJobInfo.setLastActive()
-            graph
+            false
           case None =>
-            val graph = new ExecutionGraph(
-              executionContext,
-              jobGraph.getJobID,
-              jobGraph.getName,
-              jobGraph.getJobConfiguration,
-              jobGraph.getSerializedExecutionConfig,
-              timeout,
-              restartStrategy,
-              jobGraph.getUserJarBlobKeys,
-              jobGraph.getClasspaths,
-              userCodeLoader,
-              jobMetrics)
-
-            currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
-            graph
-        }
-
-        executionGraph.setScheduleMode(jobGraph.getScheduleMode())
-        
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
-
-        try {
-          executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph))
-        }
-        catch {
-          case t: Throwable =>
-            log.warn("Cannot create JSON plan for job", t)
-            executionGraph.setJsonPlan("{}")
-        }
-
-        // initialize the vertices that have a master initialization hook
-        // file output formats create directories here, input formats create 
splits
-        if (log.isDebugEnabled) {
-          log.debug(s"Running initialization on master for job $jobId 
($jobName).")
-        }
-
-        val numSlots = scheduler.getTotalNumberOfSlots()
-
-        for (vertex <- jobGraph.getVertices.asScala) {
-          val executableClass = vertex.getInvokableClassName
-          if (executableClass == null || executableClass.length == 0) {
-            throw new JobSubmissionException(jobId,
-              s"The vertex ${vertex.getID} (${vertex.getName}) has no 
invokable class.")
-          }
-
-          if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) 
{
-            vertex.setParallelism(numSlots)
-          }
-
-          try {
-            vertex.initializeOnMaster(userCodeLoader)
-          }
-          catch {
-            case t: Throwable =>
-              throw new JobExecutionException(jobId,
-                "Cannot initialize task '" + vertex.getName() + "': " + 
t.getMessage, t)
-          }
+            true
         }
 
-        // topologically sort the job vertices and attach the graph to the 
existing one
-        val sortedTopology = 
jobGraph.getVerticesSortedTopologicallyFromSources()
-        if (log.isDebugEnabled) {
-          log.debug(s"Adding ${sortedTopology.size()} vertices from " +
-            s"job graph $jobId ($jobName).")
-        }
-        executionGraph.attachJobGraph(sortedTopology)
-
-        if (log.isDebugEnabled) {
-          log.debug("Successfully created execution graph from job " +
-            s"graph $jobId ($jobName).")
-        }
-
-        // configure the state checkpointing
-        val snapshotSettings = jobGraph.getSnapshotSettings
-        if (snapshotSettings != null) {
-          val jobId = jobGraph.getJobID()
-
-          val idToVertex: JobVertexID => ExecutionJobVertex = id => {
-            val vertex = executionGraph.getJobVertex(id)
-            if (vertex == null) {
-              throw new JobSubmissionException(jobId,
-                "The snapshot checkpointing settings refer to non-existent 
vertex " + id)
-            }
-            vertex
-          }
-
-          val triggerVertices: java.util.List[ExecutionJobVertex] =
-            
snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava
-
-          val ackVertices: java.util.List[ExecutionJobVertex] =
-            
snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava
-
-          val confirmVertices: java.util.List[ExecutionJobVertex] =
-            
snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
-
-          val completedCheckpoints = checkpointRecoveryFactory
-            .createCheckpointStore(jobId, userCodeLoader)
-
-          val checkpointIdCounter = 
checkpointRecoveryFactory.createCheckpointIDCounter(jobId)
-
-          // Checkpoint stats tracker
-          val isStatsDisabled: Boolean = flinkConfiguration.getBoolean(
-            ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
-            ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE)
-
-          val checkpointStatsTracker : CheckpointStatsTracker =
-            if (isStatsDisabled) {
-              new DisabledCheckpointStatsTracker()
-            } else {
-              val historySize: Int = flinkConfiguration.getInteger(
-                ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-                
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
-
-              new SimpleCheckpointStatsTracker(historySize, ackVertices, 
jobMetrics)
-            }
-
-          val jobParallelism = jobGraph.getSerializedExecutionConfig
-            .deserializeValue(userCodeLoader).getParallelism()
-
-          val parallelism = if (jobParallelism == 
ExecutionConfig.PARALLELISM_AUTO_MAX) {
-            numSlots
-          } else {
-            jobParallelism
-          }
-
-          executionGraph.enableSnapshotCheckpointing(
-            snapshotSettings.getCheckpointInterval,
-            snapshotSettings.getCheckpointTimeout,
-            snapshotSettings.getMinPauseBetweenCheckpoints,
-            snapshotSettings.getMaxConcurrentCheckpoints,
-            triggerVertices,
-            ackVertices,
-            confirmVertices,
-            checkpointIdCounter,
-            completedCheckpoints,
-            savepointStore,
-            checkpointStatsTracker)
+        executionGraph = ExecutionGraphBuilder.buildGraph(
+          executionGraph,
+          jobGraph,
+          flinkConfiguration,
+          executionContext,
+          userCodeLoader,
+          checkpointRecoveryFactory,
+          savepointStore,
+          Time.of(timeout.length, timeout.unit),
+          restartStrategy,
+          jobMetrics,
+          numSlots,
+          log.logger)
+        
+        if (registerNewGraph) {
+          currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
         }
 
         // get notified about job status changes

Reply via email to