[hotfix] [jobmanager] Minor code cleanups in JobGraph and CheckpointCoordinator
This makes the exception that can occur during serialization of the ExecutionConfig explicit, and adds some comments to JobGraph. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f63426b0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f63426b0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f63426b0 Branch: refs/heads/master Commit: f63426b0322e05fd0986ae5f224a69b1320724f6 Parents: 50fd1a3 Author: Stephan Ewen <[email protected]> Authored: Thu Feb 16 18:34:51 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Feb 20 19:43:15 2017 +0100 ---------------------------------------------------------------------- .../plantranslate/JobGraphGenerator.java | 9 ++- .../checkpoint/CheckpointCoordinator.java | 2 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 84 +++++++++++--------- .../LeaderChangeJobRecoveryTest.java | 8 +- .../runtime/minicluster/MiniClusterITCase.java | 4 +- .../api/graph/StreamingJobGraphGenerator.java | 10 ++- 6 files changed, 69 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index 6f7b04a..caeb43f 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -83,6 +83,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.StringUtils; import org.apache.flink.util.Visitor; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -223,7 +224,13 @@ public class JobGraphGenerator implements Visitor<PlanNode> { // create the job graph object JobGraph graph = new JobGraph(jobId, program.getJobName()); - graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig()); + try { + graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig()); + } + catch (IOException e) { + throw new CompilerException("Could not serialize the ExecutionConfig." + + "This indicates that non-serializable types (like custom serializers) were registered"); + } graph.setAllowQueuedScheduling(false); graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout()); http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 78cad91..6cac006 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -623,7 +623,7 @@ public class CheckpointCoordinator { * @return Flag indicating whether the ack'd checkpoint was associated * with a pending checkpoint. * - * @throws Exception If the checkpoint cannot be added to the completed checkpoint store. + * @throws CheckpointException If the checkpoint cannot be added to the completed checkpoint store. */ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException { if (shutdown || message == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 6db9277..f6377e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -53,18 +53,16 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * <p>The JobGraph is a graph of vertices and intermediate results that are connected together to * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph - * but inside certain special vertices that establish the feedback channel amongst themselves.</p> + * but inside certain special vertices that establish the feedback channel amongst themselves. * * <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result - * define the characteristics of the concrete operation and intermediate data.</p> + * define the characteristics of the concrete operation and intermediate data. */ public class JobGraph implements Serializable { private static final long serialVersionUID = 1L; - // -------------------------------------------------------------------------------------------- - // Members that define the structure / topology of the graph - // -------------------------------------------------------------------------------------------- + // --- job and configuration --- /** List of task vertices included in this job graph. */ private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>(); @@ -72,12 +70,6 @@ public class JobGraph implements Serializable { /** The job configuration attached to this job. */ private final Configuration jobConfiguration = new Configuration(); - /** Set of JAR files required to run this job. */ - private final List<Path> userJars = new ArrayList<Path>(); - - /** Set of blob keys identifying the JAR files required to run this job. */ - private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>(); - /** ID of this job. May be set if specific job id is desired (e.g. session management) */ private final JobID jobID; @@ -94,18 +86,28 @@ public class JobGraph implements Serializable { /** The mode in which the job is scheduled */ private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; - /** The settings for asynchronous snapshots */ - private JobSnapshottingSettings snapshotSettings; - - /** List of classpaths required to run this job. */ - private List<URL> classpaths = Collections.emptyList(); + // --- checkpointing --- /** Job specific execution config */ private SerializedValue<ExecutionConfig> serializedExecutionConfig; + /** The settings for the job checkpoints */ + private JobSnapshottingSettings snapshotSettings; + /** Savepoint restore settings. */ private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none(); + // --- attached resources --- + + /** Set of JAR files required to run this job. */ + private final List<Path> userJars = new ArrayList<Path>(); + + /** Set of blob keys identifying the JAR files required to run this job. */ + private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>(); + + /** List of classpaths required to run this job. */ + private List<URL> classpaths = Collections.emptyList(); + // -------------------------------------------------------------------------------------------- /** @@ -129,7 +131,13 @@ public class JobGraph implements Serializable { public JobGraph(JobID jobId, String jobName) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; - setExecutionConfig(new ExecutionConfig()); + + try { + setExecutionConfig(new ExecutionConfig()); + } catch (IOException e) { + // this should never happen, since an empty execution config is always serializable + throw new RuntimeException("bug, empty execution config is not serializable"); + } } /** @@ -260,17 +268,16 @@ public class JobGraph implements Serializable { } /** - * Sets a serialized copy of the passed ExecutionConfig. Further modification of the referenced ExecutionConfig - * object will not affect this serialized copy. + * Sets the execution config. This method eagerly serialized the ExecutionConfig for future RPC + * transport. Further modification of the referenced ExecutionConfig object will not affect + * this serialized copy. + * * @param executionConfig The ExecutionConfig to be serialized. + * @throws IOException Thrown if the serialization of the ExecutionConfig fails */ - public void setExecutionConfig(ExecutionConfig executionConfig) { + public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException { checkNotNull(executionConfig, "ExecutionConfig must not be null."); - try { - this.serializedExecutionConfig = new SerializedValue<>(executionConfig); - } catch (IOException e) { - throw new RuntimeException("Could not serialize ExecutionConfig.", e); - } + this.serializedExecutionConfig = new SerializedValue<>(executionConfig); } /** @@ -362,6 +369,21 @@ public class JobGraph implements Serializable { return classpaths; } + /** + * Gets the maximum parallelism of all operations in this job graph. + * + * @return The maximum parallelism of this job graph + */ + public int getMaximumParallelism() { + int maxParallelism = -1; + for (JobVertex vertex : taskVertices.values()) { + maxParallelism = Math.max(vertex.getParallelism(), maxParallelism); + } + return maxParallelism; + } + + // -------------------------------------------------------------------------------------------- + // Topological Graph Access // -------------------------------------------------------------------------------------------- public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException { @@ -539,18 +561,6 @@ public class JobGraph implements Serializable { } /** - * Gets the maximum parallelism of all operations in this job graph. - * @return The maximum parallelism of this job graph - */ - public int getMaximumParallelism() { - int maxParallelism = -1; - for (JobVertex vertex : taskVertices.values()) { - maxParallelism = Math.max(vertex.getParallelism(), maxParallelism); - } - return maxParallelism; - } - - /** * Uploads the previously added user JAR files to the job manager through * the job manager's BLOB server. The respective port is retrieved from the * JobManager. This function issues a blocking call. http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java index be26e7b..fe33022 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.leaderelection; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -136,11 +135,6 @@ public class LeaderChangeJobRecoveryTest extends TestLogger { sender.setSlotSharingGroup(slotSharingGroup); receiver.setSlotSharingGroup(slotSharingGroup); - ExecutionConfig executionConfig = new ExecutionConfig(); - - JobGraph jobGraph = new JobGraph("Blocking test job", sender, receiver); - jobGraph.setExecutionConfig(executionConfig); - - return jobGraph; + return new JobGraph("Blocking test job", sender, receiver); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index f656622..f90367c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -29,6 +29,8 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.io.IOException; + /** * Integration test cases for the {@link MiniCluster}. */ @@ -95,7 +97,7 @@ public class MiniClusterITCase extends TestLogger { miniCluster.runJobBlocking(job); } - private static JobGraph getSimpleJob() { + private static JobGraph getSimpleJob() throws IOException { JobVertex task = new JobVertex("Test task"); task.setParallelism(1); task.setMaxParallelism(1); http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 8877c80..a4bb165 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -50,6 +51,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -129,7 +131,13 @@ public class StreamingJobGraphGenerator { configureCheckpointing(); // set the ExecutionConfig last when it has been finalized - jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); + try { + jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); + } + catch (IOException e) { + throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." + + "This indicates that non-serializable types (like custom serializers) were registered"); + } return jobGraph; }
