Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6251#discussion_r200075750
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
---
@@ -385,6 +381,76 @@ public void
testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
restartFuture.get();
}
+ /**
+ * Tests that the task restore state is nulled after the {@link
Execution} has been
+ * deployed. See FLINK-9693.
+ */
+ @Test
+ public void testTaskRestoreStateIsNulledAfterDeployment() throws
Exception {
+ final JobVertex jobVertex = createNoOpJobVertex();
+ final JobVertexID jobVertexId = jobVertex.getID();
+
+ final SingleSlotTestingSlotOwner slotOwner = new
SingleSlotTestingSlotOwner();
+ final ProgrammedSlotProvider slotProvider =
createProgrammedSlotProvider(
+ 1,
+ Collections.singleton(jobVertexId),
+ slotOwner);
+
+ ExecutionGraph executionGraph =
ExecutionGraphTestUtils.createSimpleTestGraph(
+ new JobID(),
+ slotProvider,
+ new NoRestartStrategy(),
+ jobVertex);
+
+ ExecutionJobVertex executionJobVertex =
executionGraph.getJobVertex(jobVertexId);
+
+ ExecutionVertex executionVertex =
executionJobVertex.getTaskVertices()[0];
+
+ final Execution execution =
executionVertex.getCurrentExecutionAttempt();
+
+ final JobManagerTaskRestore taskRestoreState = new
JobManagerTaskRestore(1L, new TaskStateSnapshot());
+ execution.setInitialState(taskRestoreState);
+
+ assertThat(execution.getTaskRestore(), is(notNullValue()));
+
+ // schedule the execution vertex and wait for its deployment
+ executionVertex.scheduleForExecution(slotProvider, false,
LocationPreferenceConstraint.ANY).get();
+
+ assertThat(execution.getTaskRestore(), is(nullValue()));
+ }
+
+ @Nonnull
+ private JobVertex createNoOpJobVertex() {
+ final JobVertex jobVertex = new JobVertex("Test vertex", new
JobVertexID());
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+
+ return jobVertex;
+ }
+
+ @Nonnull
+ private ProgrammedSlotProvider createProgrammedSlotProvider(
+ int parallelism,
+ Collection<JobVertexID> jobVertexIds,
+ SlotOwner slotOwner) {
+ final ProgrammedSlotProvider slotProvider = new
ProgrammedSlotProvider(parallelism);
+
+ for (JobVertexID jobVertexId : jobVertexIds) {
+ for (int i = 0; i < parallelism; i++) {
+ final SimpleSlot slot = new SimpleSlot(
+ slotOwner,
+ new LocalTaskManagerLocation(),
+ 0,
+ new SimpleAckingTaskManagerGateway(),
+ null,
+ null );
--- End diff --
True, will remove it.
---