zentol commented on a change in pull request #8804: [FLINK-12883][WIP][runtime] 
Add elaborated partition release logic
URL: https://github.com/apache/flink/pull/8804#discussion_r299465452
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
 ##########
 @@ -76,52 +81,78 @@ public void 
testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws E
 
                final ExecutionGraph executionGraph = 
createExecutionGraph(partitionTracker, sourceVertex, operatorVertex, 
sinkVertex);
 
-               executionGraph.start(mainThreadExecutor);
-               executionGraph.scheduleForExecution();
-
                // finish vertices one after another, and verify that the 
appropriate partitions are released
-               // Note: These assertions assume that the ExecutionGraph uses a 
PipelinedRegion based failover strategy
-               final Execution sourceExecution = getExecution(sourceVertex, 
executionGraph);
-               executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), 
ExecutionState.FINISHED));
-               assertThat(releasedPartitions, empty());
-
-               final Execution operatorExecution = 
getExecution(operatorVertex, executionGraph);
-               executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), operatorExecution.getAttemptId(), 
ExecutionState.FINISHED));
-               assertThat(releasedPartitions, hasSize(1));
-               assertThat(releasedPartitions.remove(), equalTo(new 
ResultPartitionID(
-                       
sourceExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
-                       sourceExecution.getAttemptId())));
-
-               final Execution sinkExecution = getExecution(sinkVertex, 
executionGraph);
-               executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), sinkExecution.getAttemptId(), 
ExecutionState.FINISHED));
-
-               assertThat(releasedPartitions, hasSize(1));
-               assertThat(releasedPartitions.remove(), equalTo(new 
ResultPartitionID(
-                       
operatorExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
-                       operatorExecution.getAttemptId())));
+               run(() -> {
+                       final Execution sourceExecution = 
getCurrentExecution(sourceVertex, executionGraph);
+                       executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), 
ExecutionState.FINISHED));
+                       assertThat(releasedPartitions, empty());
+               });
+
+               run(() -> {
+                       final Execution sourceExecution = 
getCurrentExecution(sourceVertex, executionGraph);
+                       final Execution operatorExecution = 
getCurrentExecution(operatorVertex, executionGraph);
+                       executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), operatorExecution.getAttemptId(), 
ExecutionState.FINISHED));
+                       assertThat(releasedPartitions, hasSize(1));
+                       assertThat(releasedPartitions.remove(), equalTo(new 
ResultPartitionID(
+                               
sourceExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
+                               sourceExecution.getAttemptId())));
+               });
+
+               run(() -> {
+                       final Execution operatorExecution = 
getCurrentExecution(operatorVertex, executionGraph);
+                       final Execution sinkExecution = 
getCurrentExecution(sinkVertex, executionGraph);
+                       executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), sinkExecution.getAttemptId(), 
ExecutionState.FINISHED));
+
+                       assertThat(releasedPartitions, hasSize(1));
+                       assertThat(releasedPartitions.remove(), equalTo(new 
ResultPartitionID(
+                               
operatorExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
+                               operatorExecution.getAttemptId())));
+               });
        }
 
-       private static Execution getExecution(final JobVertex jobVertex, final 
ExecutionGraph executionGraph) {
+       private static Execution getCurrentExecution(final JobVertex jobVertex, 
final ExecutionGraph executionGraph) {
                return 
executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
        }
 
        private ExecutionGraph createExecutionGraph(final PartitionTracker 
partitionTracker, final JobVertex... vertices) throws Exception {
-               return ExecutionGraphBuilder.buildGraph(
+               final ExecutionGraph executionGraph = 
ExecutionGraphBuilder.buildGraph(
                        null,
                        new JobGraph(new JobID(), "test job", vertices),
                        new Configuration(),
-                       TestingUtils.defaultExecutor(),
-                       TestingUtils.defaultExecutor(),
+                       scheduledExecutorService,
+                       mainThreadExecutor,
                        new TestingSlotProvider(ignored -> 
CompletableFuture.completedFuture(new TestingLogicalSlot())),
                        
ExecutionGraphPartitionReleaseTest.class.getClassLoader(),
                        new StandaloneCheckpointRecoveryFactory(),
                        AkkaUtils.getDefaultTimeout(),
-                       new InfiniteDelayRestartStrategy(1),
+                       new NoRestartStrategy(),
                        new UnregisteredMetricsGroup(),
                        VoidBlobWriter.getInstance(),
                        AkkaUtils.getDefaultTimeout(),
                        log,
                        NettyShuffleMaster.INSTANCE,
                        partitionTracker);
+
+               executionGraph.start(mainThreadExecutor);
+               run (executionGraph::scheduleForExecution);
+
+               return executionGraph;
+       }
+
+       private void run(RunnableWithException r) throws Exception {
 
 Review comment:
   The `TestingComponentMainThreadExecutor` can be used though.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to