[
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532571#comment-16532571
]
ASF GitHub Bot commented on FLINK-9693:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6251#discussion_r200075750
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
---
@@ -385,6 +381,76 @@ public void
testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
restartFuture.get();
}
+ /**
+ * Tests that the task restore state is nulled after the {@link
Execution} has been
+ * deployed. See FLINK-9693.
+ */
+ @Test
+ public void testTaskRestoreStateIsNulledAfterDeployment() throws
Exception {
+ final JobVertex jobVertex = createNoOpJobVertex();
+ final JobVertexID jobVertexId = jobVertex.getID();
+
+ final SingleSlotTestingSlotOwner slotOwner = new
SingleSlotTestingSlotOwner();
+ final ProgrammedSlotProvider slotProvider =
createProgrammedSlotProvider(
+ 1,
+ Collections.singleton(jobVertexId),
+ slotOwner);
+
+ ExecutionGraph executionGraph =
ExecutionGraphTestUtils.createSimpleTestGraph(
+ new JobID(),
+ slotProvider,
+ new NoRestartStrategy(),
+ jobVertex);
+
+ ExecutionJobVertex executionJobVertex =
executionGraph.getJobVertex(jobVertexId);
+
+ ExecutionVertex executionVertex =
executionJobVertex.getTaskVertices()[0];
+
+ final Execution execution =
executionVertex.getCurrentExecutionAttempt();
+
+ final JobManagerTaskRestore taskRestoreState = new
JobManagerTaskRestore(1L, new TaskStateSnapshot());
+ execution.setInitialState(taskRestoreState);
+
+ assertThat(execution.getTaskRestore(), is(notNullValue()));
+
+ // schedule the execution vertex and wait for its deployment
+ executionVertex.scheduleForExecution(slotProvider, false,
LocationPreferenceConstraint.ANY).get();
+
+ assertThat(execution.getTaskRestore(), is(nullValue()));
+ }
+
+ @Nonnull
+ private JobVertex createNoOpJobVertex() {
+ final JobVertex jobVertex = new JobVertex("Test vertex", new
JobVertexID());
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+
+ return jobVertex;
+ }
+
+ @Nonnull
+ private ProgrammedSlotProvider createProgrammedSlotProvider(
+ int parallelism,
+ Collection<JobVertexID> jobVertexIds,
+ SlotOwner slotOwner) {
+ final ProgrammedSlotProvider slotProvider = new
ProgrammedSlotProvider(parallelism);
+
+ for (JobVertexID jobVertexId : jobVertexIds) {
+ for (int i = 0; i < parallelism; i++) {
+ final SimpleSlot slot = new SimpleSlot(
+ slotOwner,
+ new LocalTaskManagerLocation(),
+ 0,
+ new SimpleAckingTaskManagerGateway(),
+ null,
+ null );
--- End diff --
True, will remove it.
> Possible memory leak in jobmanager retaining archived checkpoints
> -----------------------------------------------------------------
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
> Issue Type: Bug
> Components: JobManager, State Backends, Checkpointing
> Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
> Reporter: Steven Zhen Wu
> Assignee: Till Rohrmann
> Priority: Major
> Labels: pull-request-available
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png,
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
> * Flink 1.4.1
> * stand-alone deployment mode
> * embarrassingly parallel: all operators are chained together
> * parallelism is over 1,000
> * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
> * set "state.backend.fs.memory-threshold" so that only jobmanager writes to
> S3 to checkpoint
> * internal checkpoint with 10 checkpoints retained in history
>
> Summary of the observations
> * 41,567 ExecutionVertex objects retained 9+ GB of memory
> * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for
> source operator
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)