Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6251#discussion_r200075750
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
    @@ -385,6 +381,76 @@ public void 
testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
                restartFuture.get();
        }
     
    +   /**
    +    * Tests that the task restore state is nulled after the {@link 
Execution} has been
    +    * deployed. See FLINK-9693.
    +    */
    +   @Test
    +   public void testTaskRestoreStateIsNulledAfterDeployment() throws 
Exception {
    +           final JobVertex jobVertex = createNoOpJobVertex();
    +           final JobVertexID jobVertexId = jobVertex.getID();
    +
    +           final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
    +           final ProgrammedSlotProvider slotProvider = 
createProgrammedSlotProvider(
    +                   1,
    +                   Collections.singleton(jobVertexId),
    +                   slotOwner);
    +
    +           ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
    +                   new JobID(),
    +                   slotProvider,
    +                   new NoRestartStrategy(),
    +                   jobVertex);
    +
    +           ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
    +
    +           ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
    +
    +           final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
    +
    +           final JobManagerTaskRestore taskRestoreState = new 
JobManagerTaskRestore(1L, new TaskStateSnapshot());
    +           execution.setInitialState(taskRestoreState);
    +
    +           assertThat(execution.getTaskRestore(), is(notNullValue()));
    +
    +           // schedule the execution vertex and wait for its deployment
    +           executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY).get();
    +
    +           assertThat(execution.getTaskRestore(), is(nullValue()));
    +   }
    +
    +   @Nonnull
    +   private JobVertex createNoOpJobVertex() {
    +           final JobVertex jobVertex = new JobVertex("Test vertex", new 
JobVertexID());
    +           jobVertex.setInvokableClass(NoOpInvokable.class);
    +
    +           return jobVertex;
    +   }
    +
    +   @Nonnull
    +   private ProgrammedSlotProvider createProgrammedSlotProvider(
    +           int parallelism,
    +           Collection<JobVertexID> jobVertexIds,
    +           SlotOwner slotOwner) {
    +           final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
    +
    +           for (JobVertexID jobVertexId : jobVertexIds) {
    +                   for (int i = 0; i < parallelism; i++) {
    +                           final SimpleSlot slot = new SimpleSlot(
    +                                   slotOwner,
    +                                   new LocalTaskManagerLocation(),
    +                                   0,
    +                                   new SimpleAckingTaskManagerGateway(),
    +                                   null,
    +                                   null    );
    --- End diff --
    
    True, will remove it.


---

Reply via email to