[ https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532571#comment-16532571 ]
ASF GitHub Bot commented on FLINK-9693: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6251#discussion_r200075750 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java --- @@ -385,6 +381,76 @@ public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception restartFuture.get(); } + /** + * Tests that the task restore state is nulled after the {@link Execution} has been + * deployed. See FLINK-9693. + */ + @Test + public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception { + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); + + final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); + final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider( + 1, + Collections.singleton(jobVertexId), + slotOwner); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + + final Execution execution = executionVertex.getCurrentExecutionAttempt(); + + final JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot()); + execution.setInitialState(taskRestoreState); + + assertThat(execution.getTaskRestore(), is(notNullValue())); + + // schedule the execution vertex and wait for its deployment + executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY).get(); + + assertThat(execution.getTaskRestore(), is(nullValue())); + } + + @Nonnull + private JobVertex createNoOpJobVertex() { + final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID()); + jobVertex.setInvokableClass(NoOpInvokable.class); + + return jobVertex; + } + + @Nonnull + private ProgrammedSlotProvider createProgrammedSlotProvider( + int parallelism, + Collection<JobVertexID> jobVertexIds, + SlotOwner slotOwner) { + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism); + + for (JobVertexID jobVertexId : jobVertexIds) { + for (int i = 0; i < parallelism; i++) { + final SimpleSlot slot = new SimpleSlot( + slotOwner, + new LocalTaskManagerLocation(), + 0, + new SimpleAckingTaskManagerGateway(), + null, + null ); --- End diff -- True, will remove it. > Possible memory leak in jobmanager retaining archived checkpoints > ----------------------------------------------------------------- > > Key: FLINK-9693 > URL: https://issues.apache.org/jira/browse/FLINK-9693 > Project: Flink > Issue Type: Bug > Components: JobManager, State Backends, Checkpointing > Affects Versions: 1.5.0, 1.6.0 > Environment: !image.png!!image (1).png! > Reporter: Steven Zhen Wu > Assignee: Till Rohrmann > Priority: Major > Labels: pull-request-available > Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, > ExecutionVertexZoomIn.png > > > First, some context about the job > * Flink 1.4.1 > * stand-alone deployment mode > * embarrassingly parallel: all operators are chained together > * parallelism is over 1,000 > * stateless except for Kafka source operators. checkpoint size is 8.4 MB. > * set "state.backend.fs.memory-threshold" so that only jobmanager writes to > S3 to checkpoint > * internal checkpoint with 10 checkpoints retained in history > > Summary of the observations > * 41,567 ExecutionVertex objects retained 9+ GB of memory > * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for > source operator -- This message was sent by Atlassian JIRA (v7.6.3#76005)