[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532571#comment-16532571
 ] 

ASF GitHub Bot commented on FLINK-9693:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6251#discussion_r200075750
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
    @@ -385,6 +381,76 @@ public void 
testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
                restartFuture.get();
        }
     
    +   /**
    +    * Tests that the task restore state is nulled after the {@link 
Execution} has been
    +    * deployed. See FLINK-9693.
    +    */
    +   @Test
    +   public void testTaskRestoreStateIsNulledAfterDeployment() throws 
Exception {
    +           final JobVertex jobVertex = createNoOpJobVertex();
    +           final JobVertexID jobVertexId = jobVertex.getID();
    +
    +           final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
    +           final ProgrammedSlotProvider slotProvider = 
createProgrammedSlotProvider(
    +                   1,
    +                   Collections.singleton(jobVertexId),
    +                   slotOwner);
    +
    +           ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
    +                   new JobID(),
    +                   slotProvider,
    +                   new NoRestartStrategy(),
    +                   jobVertex);
    +
    +           ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
    +
    +           ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
    +
    +           final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
    +
    +           final JobManagerTaskRestore taskRestoreState = new 
JobManagerTaskRestore(1L, new TaskStateSnapshot());
    +           execution.setInitialState(taskRestoreState);
    +
    +           assertThat(execution.getTaskRestore(), is(notNullValue()));
    +
    +           // schedule the execution vertex and wait for its deployment
    +           executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY).get();
    +
    +           assertThat(execution.getTaskRestore(), is(nullValue()));
    +   }
    +
    +   @Nonnull
    +   private JobVertex createNoOpJobVertex() {
    +           final JobVertex jobVertex = new JobVertex("Test vertex", new 
JobVertexID());
    +           jobVertex.setInvokableClass(NoOpInvokable.class);
    +
    +           return jobVertex;
    +   }
    +
    +   @Nonnull
    +   private ProgrammedSlotProvider createProgrammedSlotProvider(
    +           int parallelism,
    +           Collection<JobVertexID> jobVertexIds,
    +           SlotOwner slotOwner) {
    +           final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
    +
    +           for (JobVertexID jobVertexId : jobVertexIds) {
    +                   for (int i = 0; i < parallelism; i++) {
    +                           final SimpleSlot slot = new SimpleSlot(
    +                                   slotOwner,
    +                                   new LocalTaskManagerLocation(),
    +                                   0,
    +                                   new SimpleAckingTaskManagerGateway(),
    +                                   null,
    +                                   null    );
    --- End diff --
    
    True, will remove it.


> Possible memory leak in jobmanager retaining archived checkpoints
> -----------------------------------------------------------------
>
>                 Key: FLINK-9693
>                 URL: https://issues.apache.org/jira/browse/FLINK-9693
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager, State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.6.0
>         Environment: !image.png!!image (1).png!
>            Reporter: Steven Zhen Wu
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to