This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0b28fadccfb6b0d2a85592ced9e98b03a0c2d3bf Author: Zhu Zhu <[email protected]> AuthorDate: Tue Jul 6 11:56:29 2021 +0800 [FLINK-22677][runtime] AdaptiveScheduler requires partition registration to be completed immediately --- .../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 2b84760..d8b87e8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -959,9 +959,15 @@ public class AdaptiveScheduler ExecutionGraph executionGraph, ReservedSlots reservedSlots) { for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) { final LogicalSlot assignedSlot = reservedSlots.getSlotFor(executionVertex.getID()); - executionVertex - .getCurrentExecutionAttempt() - .registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false); + final CompletableFuture<Void> registrationFuture = + executionVertex + .getCurrentExecutionAttempt() + .registerProducedPartitions( + assignedSlot.getTaskManagerLocation(), false); + Preconditions.checkState( + registrationFuture.isDone(), + "Partition registration must be completed immediately for reactive mode"); + executionVertex.tryAssignResource(assignedSlot); }
