zentol commented on a change in pull request #8804: [FLINK-12883][WIP][runtime]
Add elaborated partition release logic
URL: https://github.com/apache/flink/pull/8804#discussion_r299465452
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
##########
@@ -76,52 +81,78 @@ public void
testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws E
final ExecutionGraph executionGraph =
createExecutionGraph(partitionTracker, sourceVertex, operatorVertex,
sinkVertex);
- executionGraph.start(mainThreadExecutor);
- executionGraph.scheduleForExecution();
-
// finish vertices one after another, and verify that the
appropriate partitions are released
- // Note: These assertions assume that the ExecutionGraph uses a
PipelinedRegion based failover strategy
- final Execution sourceExecution = getExecution(sourceVertex,
executionGraph);
- executionGraph.updateState(new
TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(),
ExecutionState.FINISHED));
- assertThat(releasedPartitions, empty());
-
- final Execution operatorExecution =
getExecution(operatorVertex, executionGraph);
- executionGraph.updateState(new
TaskExecutionState(executionGraph.getJobID(), operatorExecution.getAttemptId(),
ExecutionState.FINISHED));
- assertThat(releasedPartitions, hasSize(1));
- assertThat(releasedPartitions.remove(), equalTo(new
ResultPartitionID(
-
sourceExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
- sourceExecution.getAttemptId())));
-
- final Execution sinkExecution = getExecution(sinkVertex,
executionGraph);
- executionGraph.updateState(new
TaskExecutionState(executionGraph.getJobID(), sinkExecution.getAttemptId(),
ExecutionState.FINISHED));
-
- assertThat(releasedPartitions, hasSize(1));
- assertThat(releasedPartitions.remove(), equalTo(new
ResultPartitionID(
-
operatorExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
- operatorExecution.getAttemptId())));
+ run(() -> {
+ final Execution sourceExecution =
getCurrentExecution(sourceVertex, executionGraph);
+ executionGraph.updateState(new
TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(),
ExecutionState.FINISHED));
+ assertThat(releasedPartitions, empty());
+ });
+
+ run(() -> {
+ final Execution sourceExecution =
getCurrentExecution(sourceVertex, executionGraph);
+ final Execution operatorExecution =
getCurrentExecution(operatorVertex, executionGraph);
+ executionGraph.updateState(new
TaskExecutionState(executionGraph.getJobID(), operatorExecution.getAttemptId(),
ExecutionState.FINISHED));
+ assertThat(releasedPartitions, hasSize(1));
+ assertThat(releasedPartitions.remove(), equalTo(new
ResultPartitionID(
+
sourceExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
+ sourceExecution.getAttemptId())));
+ });
+
+ run(() -> {
+ final Execution operatorExecution =
getCurrentExecution(operatorVertex, executionGraph);
+ final Execution sinkExecution =
getCurrentExecution(sinkVertex, executionGraph);
+ executionGraph.updateState(new
TaskExecutionState(executionGraph.getJobID(), sinkExecution.getAttemptId(),
ExecutionState.FINISHED));
+
+ assertThat(releasedPartitions, hasSize(1));
+ assertThat(releasedPartitions.remove(), equalTo(new
ResultPartitionID(
+
operatorExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
+ operatorExecution.getAttemptId())));
+ });
}
- private static Execution getExecution(final JobVertex jobVertex, final
ExecutionGraph executionGraph) {
+ private static Execution getCurrentExecution(final JobVertex jobVertex,
final ExecutionGraph executionGraph) {
return
executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
}
private ExecutionGraph createExecutionGraph(final PartitionTracker
partitionTracker, final JobVertex... vertices) throws Exception {
- return ExecutionGraphBuilder.buildGraph(
+ final ExecutionGraph executionGraph =
ExecutionGraphBuilder.buildGraph(
null,
new JobGraph(new JobID(), "test job", vertices),
new Configuration(),
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
+ scheduledExecutorService,
+ mainThreadExecutor,
new TestingSlotProvider(ignored ->
CompletableFuture.completedFuture(new TestingLogicalSlot())),
ExecutionGraphPartitionReleaseTest.class.getClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
AkkaUtils.getDefaultTimeout(),
- new InfiniteDelayRestartStrategy(1),
+ new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
AkkaUtils.getDefaultTimeout(),
log,
NettyShuffleMaster.INSTANCE,
partitionTracker);
+
+ executionGraph.start(mainThreadExecutor);
+ run (executionGraph::scheduleForExecution);
+
+ return executionGraph;
+ }
+
+ private void run(RunnableWithException r) throws Exception {
Review comment:
The `TestingComponentMainThreadExecutor` can be used though.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services