Repository: flink
Updated Branches:
  refs/heads/master 2fd2ccb65 -> fb37d51f3


[FLINK-9693] Set Execution#taskRestore to null after deployment

Setting the assigned Execution#taskRestore to null after the deployment allows 
the
JobManagerTaskRestore instance to be garbage collected. Furthermore, it won't be
archived along with the Execution in the ExecutionVertex in case of a restart. 
This
is especially important when setting state.backend.fs.memory-threshold to larger
values because every state below this threshold will be stored in the meta 
state files
and, thus, also the JobManagerTaskRestore instances.

This closes #6251.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb37d51f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb37d51f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb37d51f

Branch: refs/heads/master
Commit: fb37d51f309fc891d10c20ff25736d957f80367a
Parents: 2fd2ccb
Author: Till Rohrmann <[email protected]>
Authored: Wed Jul 4 11:05:25 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Wed Jul 4 15:50:07 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |   3 +
 .../runtime/executiongraph/ExecutionTest.java   | 112 +++++++++++++++----
 2 files changed, 92 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb37d51f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 64e602f..853732f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -583,6 +583,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                taskRestore,
                                attemptNumber);
 
+                       // null taskRestore to let it be GC'ed
+                       taskRestore = null;
+
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
 
                        final CompletableFuture<Acknowledge> submitResultFuture 
= taskManagerGateway.submitTask(deployment, rpcTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb37d51f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 99879c0..d3e88e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -40,8 +42,11 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -50,6 +55,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -66,9 +73,8 @@ public class ExecutionTest extends TestLogger {
         */
        @Test
        public void testSlotReleaseOnFailedResourceAssignment() throws 
Exception {
-               final JobVertexID jobVertexId = new JobVertexID();
-               final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
-               jobVertex.setInvokableClass(NoOpInvokable.class);
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
 
                final CompletableFuture<LogicalSlot> slotFuture = new 
CompletableFuture<>();
                final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
@@ -119,9 +125,8 @@ public class ExecutionTest extends TestLogger {
         */
        @Test
        public void testSlotReleaseOnExecutionCancellationInScheduled() throws 
Exception {
-               final JobVertexID jobVertexId = new JobVertexID();
-               final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
-               jobVertex.setInvokableClass(NoOpInvokable.class);
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
 
                final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
 
@@ -169,9 +174,8 @@ public class ExecutionTest extends TestLogger {
         */
        @Test
        public void testSlotReleaseOnExecutionCancellationInRunning() throws 
Exception {
-               final JobVertexID jobVertexId = new JobVertexID();
-               final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
-               jobVertex.setInvokableClass(NoOpInvokable.class);
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
 
                final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
 
@@ -331,22 +335,14 @@ public class ExecutionTest extends TestLogger {
         */
        @Test
        public void testTerminationFutureIsCompletedAfterSlotRelease() throws 
Exception {
-               final JobVertexID jobVertexId = new JobVertexID();
-               final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
-               jobVertex.setInvokableClass(NoOpInvokable.class);
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
 
                final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
-
-               final SimpleSlot slot = new SimpleSlot(
-                       slotOwner,
-                       new LocalTaskManagerLocation(),
-                       0,
-                       new SimpleAckingTaskManagerGateway(),
-                       null,
-                       null);
-
-               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
-               slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+               final ProgrammedSlotProvider slotProvider = 
createProgrammedSlotProvider(
+                       1,
+                       Collections.singleton(jobVertexId),
+                       slotOwner);
 
                ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
                        new JobID(),
@@ -386,6 +382,76 @@ public class ExecutionTest extends TestLogger {
        }
 
        /**
+        * Tests that the task restore state is nulled after the {@link 
Execution} has been
+        * deployed. See FLINK-9693.
+        */
+       @Test
+       public void testTaskRestoreStateIsNulledAfterDeployment() throws 
Exception {
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
+
+               final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
+               final ProgrammedSlotProvider slotProvider = 
createProgrammedSlotProvider(
+                       1,
+                       Collections.singleton(jobVertexId),
+                       slotOwner);
+
+               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+                       new JobID(),
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       jobVertex);
+
+               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+               ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+               final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+               final JobManagerTaskRestore taskRestoreState = new 
JobManagerTaskRestore(1L, new TaskStateSnapshot());
+               execution.setInitialState(taskRestoreState);
+
+               assertThat(execution.getTaskRestore(), is(notNullValue()));
+
+               // schedule the execution vertex and wait for its deployment
+               executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY).get();
+
+               assertThat(execution.getTaskRestore(), is(nullValue()));
+       }
+
+       @Nonnull
+       private JobVertex createNoOpJobVertex() {
+               final JobVertex jobVertex = new JobVertex("Test vertex", new 
JobVertexID());
+               jobVertex.setInvokableClass(NoOpInvokable.class);
+
+               return jobVertex;
+       }
+
+       @Nonnull
+       private ProgrammedSlotProvider createProgrammedSlotProvider(
+               int parallelism,
+               Collection<JobVertexID> jobVertexIds,
+               SlotOwner slotOwner) {
+               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
+
+               for (JobVertexID jobVertexId : jobVertexIds) {
+                       for (int i = 0; i < parallelism; i++) {
+                               final SimpleSlot slot = new SimpleSlot(
+                                       slotOwner,
+                                       new LocalTaskManagerLocation(),
+                                       0,
+                                       new SimpleAckingTaskManagerGateway(),
+                                       null,
+                                       null);
+
+                               slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+                       }
+               }
+
+               return slotProvider;
+       }
+
+       /**
         * Slot owner which records the first returned slot.
         */
        private static final class SingleSlotTestingSlotOwner implements 
SlotOwner {

Reply via email to