Repository: flink Updated Branches: refs/heads/master 2fd2ccb65 -> fb37d51f3
[FLINK-9693] Set Execution#taskRestore to null after deployment Setting the assigned Execution#taskRestore to null after the deployment allows the JobManagerTaskRestore instance to be garbage collected. Furthermore, it won't be archived along with the Execution in the ExecutionVertex in case of a restart. This is especially important when setting state.backend.fs.memory-threshold to larger values because every state below this threshold will be stored in the meta state files and, thus, also the JobManagerTaskRestore instances. This closes #6251. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb37d51f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb37d51f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb37d51f Branch: refs/heads/master Commit: fb37d51f309fc891d10c20ff25736d957f80367a Parents: 2fd2ccb Author: Till Rohrmann <[email protected]> Authored: Wed Jul 4 11:05:25 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Wed Jul 4 15:50:07 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 3 + .../runtime/executiongraph/ExecutionTest.java | 112 +++++++++++++++---- 2 files changed, 92 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fb37d51f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 64e602f..853732f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -583,6 +583,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution taskRestore, attemptNumber); + // null taskRestore to let it be GC'ed + taskRestore = null; + final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout); http://git-wip-us.apache.org/repos/asf/flink/blob/fb37d51f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index 99879c0..d3e88e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; @@ -40,8 +42,11 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +import javax.annotation.Nonnull; + import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -50,6 +55,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -66,9 +73,8 @@ public class ExecutionTest extends TestLogger { */ @Test public void testSlotReleaseOnFailedResourceAssignment() throws Exception { - final JobVertexID jobVertexId = new JobVertexID(); - final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); - jobVertex.setInvokableClass(NoOpInvokable.class); + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); final CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<>(); final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); @@ -119,9 +125,8 @@ public class ExecutionTest extends TestLogger { */ @Test public void testSlotReleaseOnExecutionCancellationInScheduled() throws Exception { - final JobVertexID jobVertexId = new JobVertexID(); - final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); - jobVertex.setInvokableClass(NoOpInvokable.class); + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); @@ -169,9 +174,8 @@ public class ExecutionTest extends TestLogger { */ @Test public void testSlotReleaseOnExecutionCancellationInRunning() throws Exception { - final JobVertexID jobVertexId = new JobVertexID(); - final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); - jobVertex.setInvokableClass(NoOpInvokable.class); + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); @@ -331,22 +335,14 @@ public class ExecutionTest extends TestLogger { */ @Test public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception { - final JobVertexID jobVertexId = new JobVertexID(); - final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); - jobVertex.setInvokableClass(NoOpInvokable.class); + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); - - final SimpleSlot slot = new SimpleSlot( - slotOwner, - new LocalTaskManagerLocation(), - 0, - new SimpleAckingTaskManagerGateway(), - null, - null); - - final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); - slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); + final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider( + 1, + Collections.singleton(jobVertexId), + slotOwner); ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( new JobID(), @@ -386,6 +382,76 @@ public class ExecutionTest extends TestLogger { } /** + * Tests that the task restore state is nulled after the {@link Execution} has been + * deployed. See FLINK-9693. + */ + @Test + public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception { + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); + + final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); + final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider( + 1, + Collections.singleton(jobVertexId), + slotOwner); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + + final Execution execution = executionVertex.getCurrentExecutionAttempt(); + + final JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot()); + execution.setInitialState(taskRestoreState); + + assertThat(execution.getTaskRestore(), is(notNullValue())); + + // schedule the execution vertex and wait for its deployment + executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY).get(); + + assertThat(execution.getTaskRestore(), is(nullValue())); + } + + @Nonnull + private JobVertex createNoOpJobVertex() { + final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID()); + jobVertex.setInvokableClass(NoOpInvokable.class); + + return jobVertex; + } + + @Nonnull + private ProgrammedSlotProvider createProgrammedSlotProvider( + int parallelism, + Collection<JobVertexID> jobVertexIds, + SlotOwner slotOwner) { + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism); + + for (JobVertexID jobVertexId : jobVertexIds) { + for (int i = 0; i < parallelism; i++) { + final SimpleSlot slot = new SimpleSlot( + slotOwner, + new LocalTaskManagerLocation(), + 0, + new SimpleAckingTaskManagerGateway(), + null, + null); + + slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); + } + } + + return slotProvider; + } + + /** * Slot owner which records the first returned slot. */ private static final class SingleSlotTestingSlotOwner implements SlotOwner {
