[hotfix] [jobmanager] Minor code cleanups in JobGraph and CheckpointCoordinator

This makes the exception that can occur during serialization of the 
ExecutionConfig explicit,
and adds some comments to JobGraph.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f63426b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f63426b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f63426b0

Branch: refs/heads/master
Commit: f63426b0322e05fd0986ae5f224a69b1320724f6
Parents: 50fd1a3
Author: Stephan Ewen <[email protected]>
Authored: Thu Feb 16 18:34:51 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Feb 20 19:43:15 2017 +0100

----------------------------------------------------------------------
 .../plantranslate/JobGraphGenerator.java        |  9 ++-
 .../checkpoint/CheckpointCoordinator.java       |  2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java | 84 +++++++++++---------
 .../LeaderChangeJobRecoveryTest.java            |  8 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  4 +-
 .../api/graph/StreamingJobGraphGenerator.java   | 10 ++-
 6 files changed, 69 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 6f7b04a..caeb43f 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -83,6 +83,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.Visitor;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -223,7 +224,13 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
 
                // create the job graph object
                JobGraph graph = new JobGraph(jobId, program.getJobName());
-               
graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
+               try {
+                       
graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
+               }
+               catch (IOException e) {
+                       throw new CompilerException("Could not serialize the 
ExecutionConfig." +
+                                       "This indicates that non-serializable 
types (like custom serializers) were registered");
+               }
 
                graph.setAllowQueuedScheduling(false);
                
graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 78cad91..6cac006 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -623,7 +623,7 @@ public class CheckpointCoordinator {
         * @return Flag indicating whether the ack'd checkpoint was associated
         * with a pending checkpoint.
         *
-        * @throws Exception If the checkpoint cannot be added to the completed 
checkpoint store.
+        * @throws CheckpointException If the checkpoint cannot be added to the 
completed checkpoint store.
         */
        public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws CheckpointException {
                if (shutdown || message == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6db9277..f6377e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -53,18 +53,16 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The JobGraph is a graph of vertices and intermediate results that are 
connected together to
  * form a DAG. Note that iterations (feedback edges) are currently not encoded 
inside the JobGraph
- * but inside certain special vertices that establish the feedback channel 
amongst themselves.</p>
+ * but inside certain special vertices that establish the feedback channel 
amongst themselves.
  *
  * <p>The JobGraph defines the job-wide configuration settings, while each 
vertex and intermediate result
- * define the characteristics of the concrete operation and intermediate 
data.</p>
+ * define the characteristics of the concrete operation and intermediate data.
  */
 public class JobGraph implements Serializable {
 
        private static final long serialVersionUID = 1L;
 
-       // 
--------------------------------------------------------------------------------------------
-       // Members that define the structure / topology of the graph
-       // 
--------------------------------------------------------------------------------------------
+       // --- job and configuration ---
 
        /** List of task vertices included in this job graph. */
        private final Map<JobVertexID, JobVertex> taskVertices = new 
LinkedHashMap<JobVertexID, JobVertex>();
@@ -72,12 +70,6 @@ public class JobGraph implements Serializable {
        /** The job configuration attached to this job. */
        private final Configuration jobConfiguration = new Configuration();
 
-       /** Set of JAR files required to run this job. */
-       private final List<Path> userJars = new ArrayList<Path>();
-
-       /** Set of blob keys identifying the JAR files required to run this 
job. */
-       private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
-
        /** ID of this job. May be set if specific job id is desired (e.g. 
session management) */
        private final JobID jobID;
 
@@ -94,18 +86,28 @@ public class JobGraph implements Serializable {
        /** The mode in which the job is scheduled */
        private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
-       /** The settings for asynchronous snapshots */
-       private JobSnapshottingSettings snapshotSettings;
-
-       /** List of classpaths required to run this job. */
-       private List<URL> classpaths = Collections.emptyList();
+       // --- checkpointing ---
 
        /** Job specific execution config */
        private SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
+       /** The settings for the job checkpoints */
+       private JobSnapshottingSettings snapshotSettings;
+
        /** Savepoint restore settings. */
        private SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.none();
 
+       // --- attached resources ---
+
+       /** Set of JAR files required to run this job. */
+       private final List<Path> userJars = new ArrayList<Path>();
+
+       /** Set of blob keys identifying the JAR files required to run this 
job. */
+       private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
+
+       /** List of classpaths required to run this job. */
+       private List<URL> classpaths = Collections.emptyList();
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -129,7 +131,13 @@ public class JobGraph implements Serializable {
        public JobGraph(JobID jobId, String jobName) {
                this.jobID = jobId == null ? new JobID() : jobId;
                this.jobName = jobName == null ? "(unnamed job)" : jobName;
-               setExecutionConfig(new ExecutionConfig());
+
+               try {
+                       setExecutionConfig(new ExecutionConfig());
+               } catch (IOException e) {
+                       // this should never happen, since an empty execution 
config is always serializable
+                       throw new RuntimeException("bug, empty execution config 
is not serializable");
+               }
        }
 
        /**
@@ -260,17 +268,16 @@ public class JobGraph implements Serializable {
        }
 
        /**
-        * Sets a serialized copy of the passed ExecutionConfig. Further 
modification of the referenced ExecutionConfig
-        * object will not affect this serialized copy.
+        * Sets the execution config. This method eagerly serialized the 
ExecutionConfig for future RPC
+        * transport. Further modification of the referenced ExecutionConfig 
object will not affect
+        * this serialized copy.
+        * 
         * @param executionConfig The ExecutionConfig to be serialized.
+        * @throws IOException Thrown if the serialization of the 
ExecutionConfig fails
         */
-       public void setExecutionConfig(ExecutionConfig executionConfig) {
+       public void setExecutionConfig(ExecutionConfig executionConfig) throws 
IOException {
                checkNotNull(executionConfig, "ExecutionConfig must not be 
null.");
-               try {
-                       this.serializedExecutionConfig = new 
SerializedValue<>(executionConfig);
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not serialize 
ExecutionConfig.", e);
-               }
+               this.serializedExecutionConfig = new 
SerializedValue<>(executionConfig);
        }
 
        /**
@@ -362,6 +369,21 @@ public class JobGraph implements Serializable {
                return classpaths;
        }
 
+       /**
+        * Gets the maximum parallelism of all operations in this job graph.
+        *
+        * @return The maximum parallelism of this job graph
+        */
+       public int getMaximumParallelism() {
+               int maxParallelism = -1;
+               for (JobVertex vertex : taskVertices.values()) {
+                       maxParallelism = Math.max(vertex.getParallelism(), 
maxParallelism);
+               }
+               return maxParallelism;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Topological Graph Access
        // 
--------------------------------------------------------------------------------------------
 
        public List<JobVertex> getVerticesSortedTopologicallyFromSources() 
throws InvalidProgramException {
@@ -539,18 +561,6 @@ public class JobGraph implements Serializable {
        }
 
        /**
-        * Gets the maximum parallelism of all operations in this job graph.
-        * @return The maximum parallelism of this job graph
-        */
-       public int getMaximumParallelism() {
-               int maxParallelism = -1;
-               for (JobVertex vertex : taskVertices.values()) {
-                       maxParallelism = Math.max(vertex.getParallelism(), 
maxParallelism);
-               }
-               return maxParallelism;
-       }
-
-       /**
         * Uploads the previously added user JAR files to the job manager 
through
         * the job manager's BLOB server. The respective port is retrieved from 
the
         * JobManager. This function issues a blocking call.

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index be26e7b..fe33022 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -136,11 +135,6 @@ public class LeaderChangeJobRecoveryTest extends 
TestLogger {
                sender.setSlotSharingGroup(slotSharingGroup);
                receiver.setSlotSharingGroup(slotSharingGroup);
 
-               ExecutionConfig executionConfig = new ExecutionConfig();
-
-               JobGraph jobGraph = new JobGraph("Blocking test job", sender, 
receiver);
-               jobGraph.setExecutionConfig(executionConfig);
-
-               return jobGraph;
+               return new JobGraph("Blocking test job", sender, receiver);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f656622..f90367c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -29,6 +29,8 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.IOException;
+
 /**
  * Integration test cases for the {@link MiniCluster}.
  */
@@ -95,7 +97,7 @@ public class MiniClusterITCase extends TestLogger {
                miniCluster.runJobBlocking(job);
        }
 
-       private static JobGraph getSimpleJob() {
+       private static JobGraph getSimpleJob() throws IOException {
                JobVertex task = new JobVertex("Test task");
                task.setParallelism(1);
                task.setMaxParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8877c80..a4bb165 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -50,6 +51,7 @@ import 
org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -129,7 +131,13 @@ public class StreamingJobGraphGenerator {
                configureCheckpointing();
 
                // set the ExecutionConfig last when it has been finalized
-               jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
+               try {
+                       
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
+               }
+               catch (IOException e) {
+                       throw new IllegalConfigurationException("Could not 
serialize the ExecutionConfig." +
+                                       "This indicates that non-serializable 
types (like custom serializers) were registered");
+               }
 
                return jobGraph;
        }

Reply via email to