[FLINK-3701] enable reuse of ExecutionConfig Depending on the context, the ExecutionConfig's type fields may either be deserialized using a custom class loader or the default class loader. It may be explicitly serialized for the Task or shipped inside the PojoSerializer where it is serialized or directly passed in local mode. An ExecutionConfig may be reused and thus its fields can't be set to null after it has been shipped once.
The entire ExecutionConfig is now serialized upon setting it on the JobGraph. It is not passed through the JobGraph's constructor but set explicitly on the JobGraph. If no ExecutionConfig has been set, the default is used. Unlike before, no code may modify the ExecutionConfig after it has been set on the JobGraph. This closes #1913 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48b469ad Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48b469ad Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48b469ad Branch: refs/heads/master Commit: 48b469ad4f0da466b347071cea82913965645de3 Parents: 099fdfa Author: Maximilian Michels <[email protected]> Authored: Mon May 2 11:41:05 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Fri May 13 18:01:38 2016 +0200 ---------------------------------------------------------------------- .../flink/api/common/ExecutionConfig.java | 98 +---------- .../flink/api/common/ExecutionConfigTest.java | 27 +++ .../plantranslate/JobGraphGenerator.java | 14 +- .../webmonitor/handlers/JobConfigHandler.java | 8 +- .../BackPressureStatsTrackerITCase.java | 2 +- .../StackTraceSampleCoordinatorITCase.java | 2 +- .../deployment/TaskDeploymentDescriptor.java | 14 +- .../runtime/executiongraph/ExecutionGraph.java | 16 +- .../runtime/executiongraph/ExecutionVertex.java | 4 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 65 +++---- .../apache/flink/runtime/taskmanager/Task.java | 30 ++-- .../flink/runtime/jobmanager/JobManager.scala | 23 +-- .../checkpoint/CoordinatorShutdownTest.java | 4 +- ...ExecutionGraphCheckpointCoordinatorTest.java | 4 +- .../client/JobClientActorRecoveryITCase.java | 2 +- .../runtime/client/JobClientActorTest.java | 2 +- .../TaskDeploymentDescriptorTest.java | 8 +- .../ExecutionGraphConstructionTest.java | 20 +-- .../ExecutionGraphDeploymentTest.java | 10 +- .../ExecutionGraphRestartTest.java | 49 +++--- .../ExecutionGraphSignalsTest.java | 4 +- .../executiongraph/ExecutionGraphTestUtils.java | 7 +- .../ExecutionStateProgressTest.java | 176 +++++++++---------- .../executiongraph/LocalInputSplitsTest.java | 10 +- .../executiongraph/PointwisePatternTest.java | 28 +-- .../TerminalStateDeadlockTest.java | 4 +- .../VertexLocationConstraintTest.java | 26 +-- .../executiongraph/VertexSlotSharingTest.java | 4 +- .../PartialConsumePipelinedResultTest.java | 3 +- .../flink/runtime/jobgraph/JobGraphTest.java | 13 +- .../jobgraph/jsonplan/JsonGeneratorTest.java | 2 +- .../runtime/jobmanager/JobManagerTest.java | 6 +- .../flink/runtime/jobmanager/JobSubmitTest.java | 4 +- .../SlotCountExceedingParallelismTest.java | 2 +- .../StandaloneSubmittedJobGraphStoreTest.java | 2 +- .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 2 +- .../ScheduleOrUpdateConsumersTest.java | 1 - .../LeaderChangeJobRecoveryTest.java | 5 +- .../LeaderChangeStateCleanupTest.java | 2 +- .../runtime/taskmanager/TaskAsyncCallTest.java | 5 +- .../TaskCancelAsyncProducerConsumerITCase.java | 2 +- .../runtime/taskmanager/TaskCancelTest.java | 2 +- .../runtime/taskmanager/TaskManagerTest.java | 32 ++-- .../flink/runtime/taskmanager/TaskStopTest.java | 3 +- .../flink/runtime/taskmanager/TaskTest.java | 5 +- .../TaskManagerLossFailsTasksTest.scala | 8 +- .../jobmanager/CoLocationConstraintITCase.scala | 2 +- .../runtime/jobmanager/JobManagerITCase.scala | 46 +++-- .../runtime/jobmanager/RecoveryITCase.scala | 9 +- .../runtime/jobmanager/SlotSharingITCase.scala | 5 +- .../TaskManagerFailsWithSlotSharingITCase.scala | 4 +- .../api/graph/StreamingJobGraphGenerator.java | 14 +- .../streaming/api/RestartStrategyTest.java | 10 +- .../graph/StreamingJobGraphGeneratorTest.java | 14 +- .../partitioner/RescalePartitionerTest.java | 4 +- .../streaming/runtime/tasks/StreamTaskTest.java | 5 +- .../JobSubmissionFailsITCase.java | 7 +- .../JobManagerHACheckpointRecoveryITCase.java | 3 +- .../JobManagerHAJobGraphRecoveryITCase.java | 2 +- .../runtime/NetworkStackThroughputITCase.java | 2 +- .../ZooKeeperLeaderElectionITCase.java | 3 +- .../flink/test/web/WebFrontendITCase.java | 5 +- .../jobmanager/JobManagerFailsITCase.scala | 6 +- .../JobManagerLeaderSessionIDITSuite.scala | 2 +- .../taskmanager/TaskManagerFailsITCase.scala | 20 +-- 65 files changed, 437 insertions(+), 486 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index c27ee74..d27760f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -22,11 +22,10 @@ import com.esotericsoftware.kryo.Serializer; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.util.SerializedValue; -import java.io.IOException; import java.io.Serializable; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; @@ -128,7 +127,7 @@ public class ExecutionConfig implements Serializable { // ------------------------------- User code values -------------------------------------------- - private transient GlobalJobParameters globalJobParameters; + private GlobalJobParameters globalJobParameters; // Serializers and types registered with Kryo and the PojoSerializer // we store them in linked maps/sets to ensure they are registered in order in all kryo instances. @@ -145,22 +144,6 @@ public class ExecutionConfig implements Serializable { private LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<>(); - // ----------------------- Helper values for serialized user objects --------------------------- - - private SerializedValue<GlobalJobParameters> serializedGlobalJobParameters; - - private SerializedValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> serializedRegisteredTypesWithKryoSerializers; - - private SerializedValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> serializedRegisteredTypesWithKryoSerializerClasses; - - private SerializedValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> serializedDefaultKryoSerializers; - - private SerializedValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> serializedDefaultKryoSerializerClasses; - - private SerializedValue<LinkedHashSet<Class<?>>> serializedRegisteredKryoTypes; - - private SerializedValue<LinkedHashSet<Class<?>>> serializedRegisteredPojoTypes; - // -------------------------------------------------------------------------------------------- /** @@ -695,79 +678,6 @@ public class ExecutionConfig implements Serializable { this.autoTypeRegistrationEnabled = false; } - /** - * Deserializes user code objects given a user code class loader - * - * @param userCodeClassLoader User code class loader - * @throws IOException Thrown if an IOException occurs while loading the classes - * @throws ClassNotFoundException Thrown if the given class cannot be loaded - */ - public void deserializeUserCode(ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { - if (serializedRegisteredKryoTypes != null) { - registeredKryoTypes = serializedRegisteredKryoTypes.deserializeValue(userCodeClassLoader); - } else { - registeredKryoTypes = new LinkedHashSet<>(); - } - - if (serializedRegisteredPojoTypes != null) { - registeredPojoTypes = serializedRegisteredPojoTypes.deserializeValue(userCodeClassLoader); - } else { - registeredPojoTypes = new LinkedHashSet<>(); - } - - if (serializedRegisteredTypesWithKryoSerializerClasses != null) { - registeredTypesWithKryoSerializerClasses = serializedRegisteredTypesWithKryoSerializerClasses.deserializeValue(userCodeClassLoader); - } else { - registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>(); - } - - if (serializedRegisteredTypesWithKryoSerializers != null) { - registeredTypesWithKryoSerializers = serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader); - } else { - registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>(); - } - - if (serializedDefaultKryoSerializers != null) { - defaultKryoSerializers = serializedDefaultKryoSerializers.deserializeValue(userCodeClassLoader); - } else { - defaultKryoSerializers = new LinkedHashMap<>(); - - } - - if (serializedDefaultKryoSerializerClasses != null) { - defaultKryoSerializerClasses = serializedDefaultKryoSerializerClasses.deserializeValue(userCodeClassLoader); - } else { - defaultKryoSerializerClasses = new LinkedHashMap<>(); - } - - if (serializedGlobalJobParameters != null) { - globalJobParameters = serializedGlobalJobParameters.deserializeValue(userCodeClassLoader); - } - } - - public void serializeUserCode() throws IOException { - serializedRegisteredKryoTypes = new SerializedValue<>(registeredKryoTypes); - registeredKryoTypes = null; - - serializedRegisteredPojoTypes = new SerializedValue<>(registeredPojoTypes); - registeredPojoTypes = null; - - serializedRegisteredTypesWithKryoSerializerClasses = new SerializedValue<>(registeredTypesWithKryoSerializerClasses); - registeredTypesWithKryoSerializerClasses = null; - - serializedRegisteredTypesWithKryoSerializers = new SerializedValue<>(registeredTypesWithKryoSerializers); - registeredTypesWithKryoSerializers = null; - - serializedDefaultKryoSerializers = new SerializedValue<>(defaultKryoSerializers); - defaultKryoSerializers = null; - - serializedDefaultKryoSerializerClasses = new SerializedValue<>(defaultKryoSerializerClasses); - defaultKryoSerializerClasses = null; - - serializedGlobalJobParameters = new SerializedValue<>(globalJobParameters); - globalJobParameters = null; - } - @Override public boolean equals(Object obj) { if (obj instanceof ExecutionConfig) { @@ -854,10 +764,10 @@ public class ExecutionConfig implements Serializable { * Convert UserConfig into a {@code Map<String, String>} representation. * This can be used by the runtime, for example for presenting the user config in the web frontend. * - * @return Key/Value representation of the UserConfig, or null. + * @return Key/Value representation of the UserConfig */ public Map<String, String> toMap() { - return null; + return Collections.emptyMap(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java index 158d971..103e06f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java @@ -18,8 +18,10 @@ package org.apache.flink.api.common; +import org.apache.flink.util.SerializedValue; import org.junit.Test; +import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -74,4 +76,29 @@ public class ExecutionConfigTest { assertEquals(parallelism, config.getParallelism()); } + + /** + * Helper function to create a new ExecutionConfig for tests. + * @return A serialized ExecutionConfig + */ + public static SerializedValue<ExecutionConfig> getSerializedConfig() { + try { + return new SerializedValue<>(new ExecutionConfig()); + } catch (IOException e) { + throw new RuntimeException("Couldn't create new ExecutionConfig for test.", e); + } + } + + /** + * Deserializes the given ExecutionConfig with the System class loader. + * @param serializedConfig The serialized ExecutionConfig + * @return ExecutionConfig + */ + public static ExecutionConfig deserializeConfig(SerializedValue<ExecutionConfig> serializedConfig) { + try { + return serializedConfig.deserializeValue(ExecutionConfigTest.class.getClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Could not deserialize ExecutionConfig for test.", e); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index 696a05d..a5ae00c 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -83,7 +83,6 @@ import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.StringUtils; import org.apache.flink.util.Visitor; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -215,7 +214,8 @@ public class JobGraphGenerator implements Visitor<PlanNode> { // ----------- finalize the job graph ----------- // create the job graph object - JobGraph graph = new JobGraph(jobId, program.getJobName(), program.getOriginalPlan().getExecutionConfig()); + JobGraph graph = new JobGraph(jobId, program.getJobName()); + graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig()); graph.setAllowQueuedScheduling(false); graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout()); @@ -243,18 +243,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> { this.iterations = null; this.iterationStack = null; - try { - // make sure that we can send the ExecutionConfig using the system class loader - graph.getExecutionConfig().serializeUserCode(); - } catch (IOException e) { - throw new CompilerException("Could not serialize the user code object in the " + - "ExecutionConfig.", e); - } - // return job graph return graph; } - + /** * This methods implements the pre-visiting during a depth-first traversal. It create the job vertex and * sets local strategy. http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java index 0f2f514..cd63630 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java @@ -45,7 +45,13 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { gen.writeStringField("jid", graph.getJobID().toString()); gen.writeStringField("name", graph.getJobName()); - ExecutionConfig ec = graph.getExecutionConfig(); + ExecutionConfig ec; + try { + ec = graph.getSerializedExecutionConfig().deserializeValue(graph.getUserClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Couldn't deserialize ExecutionConfig.", e); + } + if (ec != null) { gen.writeObjectFieldStart("execution-config"); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java index 1f0b2ef..25dc189 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java @@ -93,7 +93,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger { final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS); // The JobGraph - final JobGraph jobGraph = new JobGraph(new ExecutionConfig()); + final JobGraph jobGraph = new JobGraph(); final int parallelism = 4; final JobVertex task = new JobVertex("Task"); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java index c6ce315..9b1f608 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java @@ -77,7 +77,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS); // The JobGraph - final JobGraph jobGraph = new JobGraph(new ExecutionConfig()); + final JobGraph jobGraph = new JobGraph(); final int parallelism = 1; final JobVertex task = new JobVertex("Task"); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index 948f6af..2b1c224 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -90,7 +90,7 @@ public final class TaskDeploymentDescriptor implements Serializable { private final SerializedValue<StateHandle<?>> operatorState; /** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */ - private final ExecutionConfig executionConfig; + private final SerializedValue<ExecutionConfig> serializedExecutionConfig; private long recoveryTimestamp; @@ -101,7 +101,7 @@ public final class TaskDeploymentDescriptor implements Serializable { JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, - ExecutionConfig executionConfig, + SerializedValue<ExecutionConfig> serializedExecutionConfig, String taskName, int indexInSubtaskGroup, int numberOfSubtasks, @@ -125,7 +125,7 @@ public final class TaskDeploymentDescriptor implements Serializable { this.jobID = checkNotNull(jobID); this.vertexID = checkNotNull(vertexID); this.executionId = checkNotNull(executionId); - this.executionConfig = checkNotNull(executionConfig); + this.serializedExecutionConfig = checkNotNull(serializedExecutionConfig); this.taskName = checkNotNull(taskName); this.indexInSubtaskGroup = indexInSubtaskGroup; this.numberOfSubtasks = numberOfSubtasks; @@ -146,7 +146,7 @@ public final class TaskDeploymentDescriptor implements Serializable { JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, - ExecutionConfig executionConfig, + SerializedValue<ExecutionConfig> serializedExecutionConfig, String taskName, int indexInSubtaskGroup, int numberOfSubtasks, @@ -164,7 +164,7 @@ public final class TaskDeploymentDescriptor implements Serializable { jobID, vertexID, executionId, - executionConfig, + serializedExecutionConfig, taskName, indexInSubtaskGroup, numberOfSubtasks, @@ -185,8 +185,8 @@ public final class TaskDeploymentDescriptor implements Serializable { * Returns the execution configuration (see {@link ExecutionConfig}) related to the * specific job. */ - public ExecutionConfig getExecutionConfig() { - return executionConfig; + public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() { + return serializedExecutionConfig; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 3796402..5dae785 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -175,7 +175,7 @@ public class ExecutionGraph implements Serializable { // ------ Configuration of the Execution ------- /** The execution configuration (see {@link ExecutionConfig}) related to this specific job. */ - private ExecutionConfig executionConfig; + private SerializedValue<ExecutionConfig> serializedExecutionConfig; /** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able * to deploy them immediately. */ @@ -245,7 +245,7 @@ public class ExecutionGraph implements Serializable { JobID jobId, String jobName, Configuration jobConfig, - ExecutionConfig config, + SerializedValue<ExecutionConfig> serializedConfig, FiniteDuration timeout, RestartStrategy restartStrategy) { this( @@ -253,7 +253,7 @@ public class ExecutionGraph implements Serializable { jobId, jobName, jobConfig, - config, + serializedConfig, timeout, restartStrategy, new ArrayList<BlobKey>(), @@ -267,7 +267,7 @@ public class ExecutionGraph implements Serializable { JobID jobId, String jobName, Configuration jobConfig, - ExecutionConfig config, + SerializedValue<ExecutionConfig> serializedConfig, FiniteDuration timeout, RestartStrategy restartStrategy, List<BlobKey> requiredJarFiles, @@ -301,7 +301,7 @@ public class ExecutionGraph implements Serializable { this.requiredJarFiles = requiredJarFiles; this.requiredClasspaths = requiredClasspaths; - this.executionConfig = checkNotNull(config); + this.serializedExecutionConfig = checkNotNull(serializedConfig); this.timeout = timeout; @@ -962,12 +962,12 @@ public class ExecutionGraph implements Serializable { } /** - * Returns the {@link ExecutionConfig}. + * Returns the serialized {@link ExecutionConfig}. * * @return ExecutionConfig */ - public ExecutionConfig getExecutionConfig() { - return executionConfig; + public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() { + return serializedExecutionConfig; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 4d27423..cbc47a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -667,7 +667,7 @@ public class ExecutionVertex implements Serializable { consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, queueToRequest, partitions)); } - ExecutionConfig config = getExecutionGraph().getExecutionConfig(); + SerializedValue<ExecutionConfig> serializedConfig = getExecutionGraph().getSerializedExecutionConfig(); List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles(); List<URL> classpaths = getExecutionGraph().getRequiredClasspaths(); @@ -675,7 +675,7 @@ public class ExecutionVertex implements Serializable { getJobId(), getJobvertexId(), executionId, - config, + serializedConfig, getTaskName(), subTaskIndex, getTotalNumberOfParallelSubtasks(), http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index b7c6551..b3e3739 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -28,6 +28,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; import java.io.IOException; import java.io.Serializable; @@ -96,79 +98,66 @@ public class JobGraph implements Serializable { private List<URL> classpaths = Collections.emptyList(); /** Job specific execution config */ - private ExecutionConfig executionConfig; + private SerializedValue<ExecutionConfig> serializedExecutionConfig; // -------------------------------------------------------------------------------------------- /** - * Constructs a new job graph with no name, a random job ID, and the given - * {@link ExecutionConfig}. - * - * @param config The {@link ExecutionConfig} for the job. - */ - public JobGraph(ExecutionConfig config) { - this(null, config); - } - - /** * Constructs a new job graph with the given name, the given {@link ExecutionConfig}, - * and a random job ID. + * and a random job ID. The ExecutionConfig will be serialized and can't be modified afterwards. * * @param jobName The name of the job. - * @param config The execution configuration of the job. */ - public JobGraph(String jobName, ExecutionConfig config) { - this(null, jobName, config); + public JobGraph(String jobName) { + this(null, jobName); } /** * Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed), * the given name and the given execution configuration (see {@link ExecutionConfig}). + * The ExecutionConfig will be serialized and can't be modified afterwards. * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. - * @param config The execution configuration of the job. */ - public JobGraph(JobID jobId, String jobName, ExecutionConfig config) { + public JobGraph(JobID jobId, String jobName) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; - this.executionConfig = config == null ? new ExecutionConfig() : config; + setExecutionConfig(new ExecutionConfig()); } /** * Constructs a new job graph with no name, a random job ID, the given {@link ExecutionConfig}, and - * the given job vertices. + * the given job vertices. The ExecutionConfig will be serialized and can't be modified afterwards. * - * @param config The execution configuration of the job. * @param vertices The vertices to add to the graph. */ - public JobGraph(ExecutionConfig config, JobVertex... vertices) { - this(null, config, vertices); + public JobGraph(JobVertex... vertices) { + this(null, vertices); } /** * Constructs a new job graph with the given name, the given {@link ExecutionConfig}, a random job ID, - * and the given job vertices. + * and the given job vertices. The ExecutionConfig will be serialized and can't be modified afterwards. * * @param jobName The name of the job. - * @param config The execution configuration of the job. * @param vertices The vertices to add to the graph. */ - public JobGraph(String jobName, ExecutionConfig config, JobVertex... vertices) { - this(null, jobName, config, vertices); + public JobGraph(String jobName, JobVertex... vertices) { + this(null, jobName, vertices); } /** * Constructs a new job graph with the given name, the given {@link ExecutionConfig}, * the given jobId or a random one if null supplied, and the given job vertices. + * The ExecutionConfig will be serialized and can't be modified afterwards. * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. - * @param config The execution configuration of the job. * @param vertices The vertices to add to the graph. */ - public JobGraph(JobID jobId, String jobName, ExecutionConfig config, JobVertex... vertices) { - this(jobId, jobName, config); + public JobGraph(JobID jobId, String jobName, JobVertex... vertices) { + this(jobId, jobName); for (JobVertex vertex : vertices) { addVertex(vertex); @@ -210,8 +199,8 @@ public class JobGraph implements Serializable { * * @return ExecutionConfig */ - public ExecutionConfig getExecutionConfig() { - return executionConfig; + public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() { + return serializedExecutionConfig; } /** @@ -249,6 +238,20 @@ public class JobGraph implements Serializable { } /** + * Sets a serialized copy of the passed ExecutionConfig. Further modification of the referenced ExecutionConfig + * object will not affect this serialized copy. + * @param executionConfig The ExecutionConfig to be serialized. + */ + public void setExecutionConfig(ExecutionConfig executionConfig) { + Preconditions.checkNotNull(executionConfig, "ExecutionConfig must not be null."); + try { + this.serializedExecutionConfig = new SerializedValue<>(executionConfig); + } catch (IOException e) { + throw new RuntimeException("Could not serialize ExecutionConfig.", e); + } + } + + /** * Adds a new task vertex to the job graph if it is not already included. * * @param vertex http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 1ae0053..251673f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -219,11 +219,11 @@ public class Task implements Runnable { private volatile long recoveryTs; - /** The job specific execution configuration (see {@link ExecutionConfig}). */ - private final ExecutionConfig executionConfig; + /** Serialized version of the job specific execution configuration (see {@link ExecutionConfig}). */ + private final SerializedValue<ExecutionConfig> serializedExecutionConfig; - /** Interval between two successive task cancellation attempts */ - private final long taskCancellationInterval; + /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */ + private long taskCancellationInterval; /** * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to @@ -253,7 +253,11 @@ public class Task implements Runnable { this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName()); this.operatorState = tdd.getOperatorState(); this.recoveryTs = tdd.getRecoveryTimestamp(); - this.executionConfig = checkNotNull(tdd.getExecutionConfig()); + this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig()); + + this.taskCancellationInterval = jobConfiguration.getLong( + ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, + ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS); this.memoryManager = checkNotNull(memManager); this.ioManager = checkNotNull(ioManager); @@ -271,15 +275,6 @@ public class Task implements Runnable { this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>(); - if (executionConfig.getTaskCancellationInterval() < 0) { - taskCancellationInterval = jobConfiguration.getLong( - ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, - ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS); - } else { - taskCancellationInterval = executionConfig.getTaskCancellationInterval(); - } - - // create the reader and writer structures final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')'; @@ -467,9 +462,14 @@ public class Task implements Runnable { // first of all, get a user-code classloader // this may involve downloading the job's JAR files and/or classes LOG.info("Loading JAR files for task " + taskNameWithSubtask); + final ClassLoader userCodeClassLoader = createUserCodeClassloader(libraryCache); + final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader); - executionConfig.deserializeUserCode(userCodeClassLoader); + if (executionConfig.getTaskCancellationInterval() >= 0) { + // override task cancellation interval from Flink config if set in ExecutionConfig + taskCancellationInterval = executionConfig.getTaskCancellationInterval(); + } // now load the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d8b8a01..3c633f3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -28,7 +28,6 @@ import akka.actor._ import akka.pattern.ask import grizzled.slf4j.Logger -import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} @@ -46,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, RestartStrategyFactory} +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex} import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager} import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator @@ -1069,11 +1068,14 @@ class JobManager( throw new JobSubmissionException(jobId, "The given job is empty") } - val restartStrategy = Option(jobGraph.getExecutionConfig().getRestartStrategy()) - .map(RestartStrategyFactory.createRestartStrategy(_)) match { - case Some(strategy) => strategy - case None => restartStrategyFactory.createRestartStrategy() - } + val restartStrategy = + Option(jobGraph.getSerializedExecutionConfig() + .deserializeValue(userCodeLoader) + .getRestartStrategy()) + .map(RestartStrategyFactory.createRestartStrategy(_)) match { + case Some(strategy) => strategy + case None => restartStrategyFactory.createRestartStrategy() + } log.info(s"Using restart strategy $restartStrategy for $jobId.") @@ -1088,7 +1090,7 @@ class JobManager( jobGraph.getJobID, jobGraph.getName, jobGraph.getJobConfiguration, - jobGraph.getExecutionConfig, + jobGraph.getSerializedExecutionConfig, timeout, restartStrategy, jobGraph.getUserJarBlobKeys, @@ -1197,12 +1199,13 @@ class JobManager( new SimpleCheckpointStatsTracker(historySize, ackVertices) } - val jobParallelism = jobGraph.getExecutionConfig.getParallelism() + val jobParallelism = jobGraph.getSerializedExecutionConfig + .deserializeValue(userCodeLoader).getParallelism() val parallelism = if (jobParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { numSlots } else { - jobGraph.getExecutionConfig.getParallelism + jobParallelism } executionGraph.enableSnapshotCheckpointing( http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index 03ff83d..91a83b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -61,7 +61,7 @@ public class CoordinatorShutdownTest { vertex.setInvokableClass(Tasks.NoOpInvokable.class); List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); - JobGraph testGraph = new JobGraph("test job", new ExecutionConfig(), vertex); + JobGraph testGraph = new JobGraph("test job", vertex); testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000, 60000, 0L, Integer.MAX_VALUE)); @@ -113,7 +113,7 @@ public class CoordinatorShutdownTest { vertex.setInvokableClass(Tasks.NoOpInvokable.class); List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); - JobGraph testGraph = new JobGraph("test job", new ExecutionConfig(), vertex); + JobGraph testGraph = new JobGraph("test job", vertex); testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000, 60000, 0L, Integer.MAX_VALUE)); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index 965556f..a801348 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.checkpoint; import akka.actor.ActorSystem; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -50,7 +50,7 @@ public class ExecutionGraphCheckpointCoordinatorTest { new JobID(), "test", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), new FiniteDuration(1, TimeUnit.DAYS), new NoRestartStrategy(), Collections.<BlobKey>emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java index 865760e..cc1994a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java @@ -95,7 +95,7 @@ public class JobClientActorRecoveryITCase extends TestLogger { JobVertex blockingVertex = new JobVertex("Blocking Vertex"); blockingVertex.setInvokableClass(BlockingTask.class); blockingVertex.setParallelism(1); - final JobGraph jobGraph = new JobGraph("Blocking Test Job", new ExecutionConfig(), blockingVertex); + final JobGraph jobGraph = new JobGraph("Blocking Test Job", blockingVertex); final Promise<JobExecutionResult> promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java index ee1fd60..073164c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java @@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit; public class JobClientActorTest extends TestLogger { private static ActorSystem system; - private static JobGraph testJobGraph = new JobGraph("Test Job", new ExecutionConfig()); + private static JobGraph testJobGraph = new JobGraph("Test Job"); @BeforeClass public static void setup() { http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 63e62bf..36744a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -35,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.util.SerializedValue; import org.junit.Test; public class TaskDeploymentDescriptorTest { @@ -55,7 +57,7 @@ public class TaskDeploymentDescriptorTest { final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0); final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0); final List<URL> requiredClasspaths = new ArrayList<URL>(0); - final ExecutionConfig executionConfig = new ExecutionConfig(); + final SerializedValue<ExecutionConfig> executionConfig = ExecutionConfigTest.getSerializedConfig(); final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, executionConfig, taskName, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber, @@ -78,9 +80,7 @@ public class TaskDeploymentDescriptorTest { assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber()); assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions()); assertEquals(orig.getInputGates(), copy.getInputGates()); - // load serialized values in ExecutionConfig - copy.getExecutionConfig().deserializeUserCode(getClass().getClassLoader()); - assertEquals(orig.getExecutionConfig(), copy.getExecutionConfig()); + assertEquals(orig.getSerializedExecutionConfig(), copy.getSerializedExecutionConfig()); assertEquals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles()); assertEquals(orig.getRequiredClasspaths(), copy.getRequiredClasspaths()); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index d845d01..8eebe66 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -26,7 +26,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -107,7 +107,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -152,7 +152,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -220,7 +220,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -475,7 +475,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -532,7 +532,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -594,7 +594,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -640,7 +640,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); @@ -705,14 +705,14 @@ public class ExecutionGraphConstructionTest { JobVertex v8 = new JobVertex("vertex8"); v8.setParallelism(2); - JobGraph jg = new JobGraph(jobId, jobName, new ExecutionConfig(), v1, v2, v3, v4, v5, v6, v7, v8); + JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 7a9cee7..d126acb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -30,7 +30,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -84,8 +84,8 @@ public class ExecutionGraphDeploymentTest { TestingUtils.defaultExecutionContext(), jobId, "some job", - new Configuration(), - new ExecutionConfig(), + new Configuration(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); @@ -289,7 +289,7 @@ public class ExecutionGraphDeploymentTest { jobId, "some job", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); @@ -332,4 +332,4 @@ public class ExecutionGraphDeploymentTest { throw new Exception(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 0837927..01cca5c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; @@ -73,14 +74,14 @@ public class ExecutionGraphRestartTest extends TestLogger { sender.setInvokableClass(Tasks.NoOpInvokable.class); sender.setParallelism(NUM_TASKS); - JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender); + JobGraph jobGraph = new JobGraph("Pointwise job", sender); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -129,13 +130,13 @@ public class ExecutionGraphRestartTest extends TestLogger { groupVertex.setStrictlyCoLocatedWith(groupVertex2); //initiate and schedule job - JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), groupVertex, groupVertex2); + JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 0L)); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -184,14 +185,14 @@ public class ExecutionGraphRestartTest extends TestLogger { sender.setInvokableClass(Tasks.NoOpInvokable.class); sender.setParallelism(NUM_TASKS); - JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender); + JobGraph jobGraph = new JobGraph("Pointwise job", sender); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000)); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -220,7 +221,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "TestJob", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), // We want to manually control the restart and delay new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE)); @@ -229,7 +230,7 @@ public class ExecutionGraphRestartTest extends TestLogger { jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); jobVertex.setParallelism(NUM_TASKS); - JobGraph jobGraph = new JobGraph("TestJob", new ExecutionConfig(), jobVertex); + JobGraph jobGraph = new JobGraph("TestJob", jobVertex); executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -279,7 +280,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "TestJob", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), // We want to manually control the restart and delay new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE)); @@ -295,7 +296,7 @@ public class ExecutionGraphRestartTest extends TestLogger { jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); jobVertex.setParallelism(NUM_TASKS); - JobGraph jobGraph = new JobGraph("TestJob", new ExecutionConfig(), jobVertex); + JobGraph jobGraph = new JobGraph("TestJob", jobVertex); executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -355,14 +356,14 @@ public class ExecutionGraphRestartTest extends TestLogger { sender.setInvokableClass(Tasks.NoOpInvokable.class); sender.setParallelism(NUM_TASKS); - JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender); + JobGraph jobGraph = new JobGraph("Pointwise job", sender); ExecutionGraph eg = spy(new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000))); @@ -426,14 +427,14 @@ public class ExecutionGraphRestartTest extends TestLogger { receiver.setInvokableClass(Tasks.NoOpInvokable.class); receiver.setParallelism(1); - JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver); + JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000)); @@ -518,16 +519,18 @@ public class ExecutionGraphRestartTest extends TestLogger { vertex.setInvokableClass(Tasks.NoOpInvokable.class); vertex.setParallelism(1); - JobGraph jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex); - jobGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart( - Integer.MAX_VALUE, Integer.MAX_VALUE)); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart( + Integer.MAX_VALUE, Integer.MAX_VALUE)); + JobGraph jobGraph = new JobGraph("Test Job", vertex); + jobGraph.setExecutionConfig(executionConfig); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000000)); @@ -570,16 +573,18 @@ public class ExecutionGraphRestartTest extends TestLogger { vertex.setInvokableClass(Tasks.NoOpInvokable.class); vertex.setParallelism(1); - JobGraph jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex); - jobGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart( - Integer.MAX_VALUE, Integer.MAX_VALUE)); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart( + Integer.MAX_VALUE, Integer.MAX_VALUE)); + JobGraph jobGraph = new JobGraph("Test Job", vertex); + jobGraph.setExecutionConfig(executionConfig); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000000)); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index d1bb680..8b04fa3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.StoppingException; @@ -128,7 +128,7 @@ public class ExecutionGraphSignalsTest { jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); eg.attachJobGraph(ordered); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 6659b5a..92a7402 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -22,10 +22,11 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import java.io.IOException; import java.lang.reflect.Field; import java.net.InetAddress; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; @@ -176,7 +177,7 @@ public class ExecutionGraphTestUtils { new JobID(), "test job", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); @@ -197,7 +198,7 @@ public class ExecutionGraphTestUtils { return ejv; } - public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException { + public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException, IOException { return getExecutionVertex(id, TestingUtils.defaultExecutionContext()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 1ff90e1..9e4aa6d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -1,88 +1,88 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*; -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; - -import java.util.Arrays; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; -import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.Test; - -public class ExecutionStateProgressTest { - - @Test - public void testAccumulatedStateFinished() { - try { - final JobID jid = new JobID(); - final JobVertexID vid = new JobVertexID(); - - JobVertex ajv = new JobVertex("TestVertex", vid); - ajv.setParallelism(3); - ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); - - ExecutionGraph graph = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jid, - "test job", - new Configuration(), - new ExecutionConfig(), - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); - graph.attachJobGraph(Arrays.asList(ajv)); - - setGraphStatus(graph, JobStatus.RUNNING); - - ExecutionJobVertex ejv = graph.getJobVertex(vid); - - // mock resources and mock taskmanager - for (ExecutionVertex ee : ejv.getTaskVertices()) { - SimpleSlot slot = getInstance( - new SimpleActorGateway( - TestingUtils.defaultExecutionContext()) - ).allocateSimpleSlot(jid); - ee.deployToSlot(slot); - } - - // finish all - for (ExecutionVertex ee : ejv.getTaskVertices()) { - ee.executionFinished(); - } - - assertTrue(ejv.isInFinalState()); - assertEquals(JobStatus.FINISHED, graph.getState()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} \ No newline at end of file +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +import java.util.Arrays; + +import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.Test; + +public class ExecutionStateProgressTest { + + @Test + public void testAccumulatedStateFinished() { + try { + final JobID jid = new JobID(); + final JobVertexID vid = new JobVertexID(); + + JobVertex ajv = new JobVertex("TestVertex", vid); + ajv.setParallelism(3); + ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); + + ExecutionGraph graph = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jid, + "test job", + new Configuration(), + ExecutionConfigTest.getSerializedConfig(), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); + graph.attachJobGraph(Arrays.asList(ajv)); + + setGraphStatus(graph, JobStatus.RUNNING); + + ExecutionJobVertex ejv = graph.getJobVertex(vid); + + // mock resources and mock taskmanager + for (ExecutionVertex ee : ejv.getTaskVertices()) { + SimpleSlot slot = getInstance( + new SimpleActorGateway( + TestingUtils.defaultExecutionContext()) + ).allocateSimpleSlot(jid); + ee.deployToSlot(slot); + } + + // finish all + for (ExecutionVertex ee : ejv.getTaskVertices()) { + ee.executionFinished(); + } + + assertTrue(ejv.isInFinalState()); + assertEquals(JobStatus.FINISHED, graph.getState()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java index a4c86e3..d7ce0ba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java @@ -25,7 +25,7 @@ import static org.mockito.Mockito.when; import java.net.InetAddress; import java.util.concurrent.TimeUnit; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.io.StrictlyLocalAssignment; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; @@ -266,14 +266,14 @@ public class LocalInputSplitsTest { vertex.setInvokableClass(DummyInvokable.class); vertex.setInputSplitSource(new TestInputSplitSource(splits)); - JobGraph jobGraph = new JobGraph("test job", new ExecutionConfig(), vertex); + JobGraph jobGraph = new JobGraph("test job", vertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), TIMEOUT, new NoRestartStrategy()); @@ -331,14 +331,14 @@ public class LocalInputSplitsTest { vertex.setInvokableClass(DummyInvokable.class); vertex.setInputSplitSource(new TestInputSplitSource(splits)); - JobGraph jobGraph = new JobGraph("test job", new ExecutionConfig(), vertex); + JobGraph jobGraph = new JobGraph("test job", vertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), TIMEOUT, new NoRestartStrategy()); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index cbeeded..1b369db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -64,7 +64,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -105,8 +105,8 @@ public class PointwisePatternTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - new ExecutionConfig(), + cfg, + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -148,8 +148,8 @@ public class PointwisePatternTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - new ExecutionConfig(), + cfg, + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -192,8 +192,8 @@ public class PointwisePatternTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - new ExecutionConfig(), + cfg, + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -234,8 +234,8 @@ public class PointwisePatternTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - new ExecutionConfig(), + cfg, + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -296,8 +296,8 @@ public class PointwisePatternTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - new ExecutionConfig(), + cfg, + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -349,8 +349,8 @@ public class PointwisePatternTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - new ExecutionConfig(), + cfg, + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index a28fb49..8bc474b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -188,7 +188,7 @@ public class TerminalStateDeadlockTest { jobId, "test graph", EMPTY_CONFIG, - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), TIMEOUT, new FixedDelayRestartStrategy(1, 0)); } http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java index d866b2f..c483f41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java @@ -26,7 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.DummyActorGateway; @@ -76,14 +76,14 @@ public class VertexLocationConstraintTest { JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID()); jobVertex.setInvokableClass(DummyInvokable.class); jobVertex.setParallelism(2); - JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex); + JobGraph jg = new JobGraph("test job", jobVertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(jobVertex)); @@ -149,14 +149,14 @@ public class VertexLocationConstraintTest { JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID()); jobVertex.setInvokableClass(DummyInvokable.class); jobVertex.setParallelism(2); - JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex); + JobGraph jg = new JobGraph("test job", jobVertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(jobVertex)); @@ -226,14 +226,14 @@ public class VertexLocationConstraintTest { jobVertex1.setSlotSharingGroup(sharingGroup); jobVertex2.setSlotSharingGroup(sharingGroup); - JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex1, jobVertex2); + JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); @@ -294,14 +294,14 @@ public class VertexLocationConstraintTest { JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID()); jobVertex.setInvokableClass(DummyInvokable.class); jobVertex.setParallelism(1); - JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex); + JobGraph jg = new JobGraph("test job", jobVertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(jobVertex)); @@ -360,7 +360,7 @@ public class VertexLocationConstraintTest { jobVertex1.setParallelism(1); jobVertex2.setParallelism(1); - JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex1, jobVertex2); + JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2); SlotSharingGroup sharingGroup = new SlotSharingGroup(); jobVertex1.setSlotSharingGroup(sharingGroup); @@ -371,7 +371,7 @@ public class VertexLocationConstraintTest { jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); @@ -404,14 +404,14 @@ public class VertexLocationConstraintTest { public void testArchivingClearsFields() { try { JobVertex vertex = new JobVertex("test vertex", new JobVertexID()); - JobGraph jg = new JobGraph("test job", new ExecutionConfig(), vertex); + JobGraph jg = new JobGraph("test job", vertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(vertex)); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index 5110249..7a23e26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -24,7 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; @@ -76,7 +76,7 @@ public class VertexSlotSharingTest { new JobID(), "test job", new Configuration(), - new ExecutionConfig(), + ExecutionConfigTest.getSerializedConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); eg.attachJobGraph(vertices); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index 317eed7..af8aa69 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -89,8 +89,7 @@ public class PartialConsumePipelinedResultTest { receiver.connectNewDataSetAsInput( sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - final JobGraph jobGraph = new JobGraph( - "Partial Consume of Pipelined Result", new ExecutionConfig(), sender, receiver); + final JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", sender, receiver); final SlotSharingGroup slotSharingGroup = new SlotSharingGroup( sender.getID(), receiver.getID()); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java index 68b05b2..74f1adf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java @@ -22,7 +22,6 @@ import static org.junit.Assert.*; import java.util.List; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; @@ -32,7 +31,7 @@ public class JobGraphTest { @Test public void testSerialization() { try { - JobGraph jg = new JobGraph("The graph", new ExecutionConfig()); + JobGraph jg = new JobGraph("The graph"); // add some configuration values { @@ -91,7 +90,7 @@ public class JobGraphTest { intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE); intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE); - JobGraph graph = new JobGraph("TestGraph", new ExecutionConfig(), + JobGraph graph = new JobGraph("TestGraph", source1, source2, intermediate1, intermediate2, target1, target2); List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources(); @@ -136,7 +135,7 @@ public class JobGraphTest { l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE); - JobGraph graph = new JobGraph("TestGraph", new ExecutionConfig(), + JobGraph graph = new JobGraph("TestGraph", source1, source2, root, l11, l13, l12, l2); List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources(); @@ -183,7 +182,7 @@ public class JobGraphTest { op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE); op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE); - JobGraph graph = new JobGraph("TestGraph", new ExecutionConfig(), source, op1, op2, op3); + JobGraph graph = new JobGraph("TestGraph", source, op1, op2, op3); List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources(); assertEquals(4, sorted.size()); @@ -212,7 +211,7 @@ public class JobGraphTest { v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE); v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE); - JobGraph jg = new JobGraph("Cyclic Graph", new ExecutionConfig(), v1, v2, v3, v4); + JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4); try { jg.getVerticesSortedTopologicallyFromSources(); fail("Failed to raise error on topologically sorting cyclic graph."); @@ -244,7 +243,7 @@ public class JobGraphTest { v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE); target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE); - JobGraph jg = new JobGraph("Cyclic Graph", new ExecutionConfig(), v1, v2, v3, v4, source, target); + JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4, source, target); try { jg.getVerticesSortedTopologicallyFromSources(); fail("Failed to raise error on topologically sorting cyclic graph."); http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java index 612f64f..d1d5f03 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java @@ -67,7 +67,7 @@ public class JsonGeneratorTest { sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE); sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL); - JobGraph jg = new JobGraph("my job", new ExecutionConfig(), source1, source2, source3, + JobGraph jg = new JobGraph("my job", source1, source2, source3, intermediate1, intermediate2, join1, join2, sink1, sink2); String plan = JsonPlanGenerator.generatePlan(jg);
