dmvk commented on code in PR #26663: URL: https://github.com/apache/flink/pull/26663#discussion_r2154595031
########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -2932,4 +3024,54 @@ private static void checkMetrics(List<Span> results, boolean canRestart) { .containsEntry("canRestart", String.valueOf(canRestart)); } } + + private static JobGraph getCheckpointingSingleVertexJobGraph(final JobVertex jobVertex) { + final JobGraph jobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder() + .addJobVertices(Collections.singletonList(jobVertex)) + .setJobCheckpointingSettings( + new JobCheckpointingSettings( + new CheckpointCoordinatorConfiguration + .CheckpointCoordinatorConfigurationBuilder() + .build(), + null)) + .build(); + SchedulerTestingUtils.enableCheckpointing(jobGraph); + return jobGraph; + } + + private static AdaptiveScheduler.@NotNull StateTransitionManagerFactory + getAutoAdvanceStateTransitionManagerFactory() { + return (context, + ignoredClock, + ignoredCooldown, + ignoredResourceStabilizationTimeout, + ignoredMaxTriggerDelay) -> + TestingStateTransitionManager.withOnTriggerEventOnly( + () -> { + if (context instanceof WaitingForResources) { + context.transitionToSubsequentState(); + } + }); + } + + private static Map<OperatorID, OperatorSubtaskState> getFakeKeyedManagedStateForAllOperators( Review Comment: ```suggestion private static Map<OperatorID, OperatorSubtaskState> generateManagedKeyedStateForAllOperators( ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java: ########## @@ -58,22 +54,13 @@ public Optional<Long> estimate(ExecutionVertexID jobVertexId) { return Optional.ofNullable(stateSizes.get(jobVertexId)); } - static StateSizeEstimates empty() { - return new StateSizeEstimates(); - } - - public static StateSizeEstimates fromGraph(@Nullable ExecutionGraph executionGraph) { - return Optional.ofNullable(executionGraph) - .flatMap(graph -> Optional.ofNullable(graph.getCheckpointCoordinator())) - .flatMap(coordinator -> Optional.ofNullable(coordinator.getCheckpointStore())) - .flatMap(store -> Optional.ofNullable(store.getLatestCheckpoint())) - .map( - cp -> - new StateSizeEstimates( - merge( - fromCompletedCheckpoint(cp), - mapVerticesToOperators(executionGraph)))) - .orElse(empty()); + public static StateSizeEstimates fromGraphAndState( + @NotNull final ExecutionGraph executionGraph, + @NotNull final CompletedCheckpoint latestCheckpoint) { Review Comment: nit: we don't explicitly annotate non null parameters anymore; everything is treated as non null unless annotated otherwise ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -2932,4 +3024,54 @@ private static void checkMetrics(List<Span> results, boolean canRestart) { .containsEntry("canRestart", String.valueOf(canRestart)); } } + + private static JobGraph getCheckpointingSingleVertexJobGraph(final JobVertex jobVertex) { + final JobGraph jobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder() + .addJobVertices(Collections.singletonList(jobVertex)) + .setJobCheckpointingSettings( + new JobCheckpointingSettings( + new CheckpointCoordinatorConfiguration + .CheckpointCoordinatorConfigurationBuilder() + .build(), + null)) + .build(); + SchedulerTestingUtils.enableCheckpointing(jobGraph); + return jobGraph; + } + + private static AdaptiveScheduler.@NotNull StateTransitionManagerFactory + getAutoAdvanceStateTransitionManagerFactory() { + return (context, + ignoredClock, + ignoredCooldown, + ignoredResourceStabilizationTimeout, + ignoredMaxTriggerDelay) -> + TestingStateTransitionManager.withOnTriggerEventOnly( + () -> { + if (context instanceof WaitingForResources) { + context.transitionToSubsequentState(); + } + }); + } + + private static Map<OperatorID, OperatorSubtaskState> getFakeKeyedManagedStateForAllOperators( + final JobGraph jobGraph) throws IOException { + final Map<OperatorID, OperatorSubtaskState> operatorStates = new HashMap<>(); + for (final JobVertex jobVertex : jobGraph.getVertices()) { + final KeyedStateHandle keyedStateHandle = + generateKeyGroupState(jobVertex.getID(), KeyGroupRange.of(0, 0), true); Review Comment: ```suggestion generateKeyGroupState(jobVertex.getID(), KeyGroupRange.of(0, jobGraph.getMaximumParallelism() - 1), false); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java: ########## @@ -74,6 +78,56 @@ public void testDownScaleWithUnevenStateSize() { verifyAssignments(assignments, newParallelism, allocationWith200bytes); } + @Test + // In case of local recovery, we want to preserve slot allocations even if there is no + // keyed managed state available. + public void testSlotsPreservationWithNoStateSameParallelism() { + final int parallelism = 2; + final VertexInformation vertex = createVertex(parallelism); + final AllocationID allocationID1 = new AllocationID(); + final AllocationID allocationID2 = new AllocationID(); + + final List<VertexAllocationInformation> previousAllocations = + Arrays.asList( + new VertexAllocationInformation( + allocationID1, vertex.getJobVertexID(), KeyGroupRange.of(0, 63), 0), + new VertexAllocationInformation( + allocationID2, + vertex.getJobVertexID(), + KeyGroupRange.of(64, 127), + 0)); + + final Collection<SlotAssignment> assignments = + assign( + vertex, + // Providing allocation IDs in reverse order to check that assigner fixes + // the order based on previous allocations. + Arrays.asList(allocationID2, allocationID1), + previousAllocations); + + // Extract allocation IDs from assignments sorted by subtask index. + final List<AllocationID> subtaskOrderedNewAllocations = + assignments.stream() + .sorted( + Comparator.comparingInt( + assignment -> + assignment + .getTargetAs( + SlotSharingSlotAllocator + .ExecutionSlotSharingGroup + .class) + .getContainedExecutionVertices() + .stream() + .mapToInt( + ExecutionVertexID::getSubtaskIndex) + .findAny() + .orElseThrow())) + .map(assignment -> assignment.getSlotInfo().getAllocationId()) + .collect(Collectors.toList()); + + assertThat(subtaskOrderedNewAllocations, contains(allocationID1, allocationID2)); Review Comment: nit: we should be using assertj for all new tests ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -2072,6 +2097,125 @@ void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable() assertThat(assignmentResult.isSuccess()).isFalse(); } + @Test + void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception { + final JobGraph jobGraph = getCheckpointingSingleVertexJobGraph(JOB_VERTEX); + final DeclarativeSlotPool slotPool = getSlotPoolWithFreeSlots(PARALLELISM); + final List<JobAllocationsInformation> capturedAllocations = new ArrayList<>(); + final boolean localRecoveryEnabled = true; + final String executionTarget = "local"; + final boolean minimalTaskManagerPreferred = false; + final SlotAllocator slotAllocator = + getArgumentCapturingDelegatingSlotAllocator( + AdaptiveSchedulerFactory.createSlotSharingSlotAllocator( + slotPool, + localRecoveryEnabled, + executionTarget, + minimalTaskManagerPreferred), + capturedAllocations); + + scheduler = + new AdaptiveSchedulerBuilder( + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .setDeclarativeSlotPool(slotPool) + .setSlotAllocator(slotAllocator) + .setStateTransitionManagerFactory( + getAutoAdvanceStateTransitionManagerFactory()) + .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)) + .build(); + + // Start scheduler + startTestInstanceInMainThread(); + + // Transition job and all subtasks to RUNNING state. + waitForJobStatusRunning(scheduler); + runInMainThread(() -> setAllExecutionsToRunning(scheduler)); + + // Trigger a checkpoint + CompletableFuture<CompletedCheckpoint> completedCheckpointFuture = + supplyInMainThread(() -> scheduler.triggerCheckpoint(CheckpointType.FULL)); + + // Verify that checkpoint was registered by scheduler. + waitForCheckpointInProgress(scheduler); Review Comment: ```suggestion ``` Why do we need this? ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -2072,6 +2097,125 @@ void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable() assertThat(assignmentResult.isSuccess()).isFalse(); } + @Test + void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception { + final JobGraph jobGraph = getCheckpointingSingleVertexJobGraph(JOB_VERTEX); + final DeclarativeSlotPool slotPool = getSlotPoolWithFreeSlots(PARALLELISM); + final List<JobAllocationsInformation> capturedAllocations = new ArrayList<>(); + final boolean localRecoveryEnabled = true; + final String executionTarget = "local"; + final boolean minimalTaskManagerPreferred = false; + final SlotAllocator slotAllocator = + getArgumentCapturingDelegatingSlotAllocator( + AdaptiveSchedulerFactory.createSlotSharingSlotAllocator( + slotPool, + localRecoveryEnabled, + executionTarget, + minimalTaskManagerPreferred), + capturedAllocations); + + scheduler = + new AdaptiveSchedulerBuilder( + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .setDeclarativeSlotPool(slotPool) + .setSlotAllocator(slotAllocator) + .setStateTransitionManagerFactory( + getAutoAdvanceStateTransitionManagerFactory()) + .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)) + .build(); + + // Start scheduler + startTestInstanceInMainThread(); + + // Transition job and all subtasks to RUNNING state. + waitForJobStatusRunning(scheduler); + runInMainThread(() -> setAllExecutionsToRunning(scheduler)); + + // Trigger a checkpoint + CompletableFuture<CompletedCheckpoint> completedCheckpointFuture = + supplyInMainThread(() -> scheduler.triggerCheckpoint(CheckpointType.FULL)); + + // Verify that checkpoint was registered by scheduler. + waitForCheckpointInProgress(scheduler); + + // Acknowledge the checkpoint for all tasks with the fake state. + final Map<OperatorID, OperatorSubtaskState> operatorStates = + getFakeKeyedManagedStateForAllOperators(jobGraph); + runInMainThread(() -> acknowledgePendingCheckpoint(scheduler, 1, operatorStates)); + + // Wait for the checkpoint to complete. + final CompletedCheckpoint completedCheckpoint = + completedCheckpointFuture.get(CHECKPOINT_TIMEOUT_SECONDS, TimeUnit.SECONDS); Review Comment: other option is to use `assertThatFuture(future).eventuallySucceeds()` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -2932,4 +3024,54 @@ private static void checkMetrics(List<Span> results, boolean canRestart) { .containsEntry("canRestart", String.valueOf(canRestart)); } } + + private static JobGraph getCheckpointingSingleVertexJobGraph(final JobVertex jobVertex) { Review Comment: nit: ```suggestion private static JobGraph createJobGraphWithCheckpointing(JobVertex jobVertex) { ``` That it's single vertex is kind of given by the arguments In theory you could also transform it to: ``` ```suggestion private static JobGraph createJobGraphWithCheckpointing(JobVertex... jobVertex) { ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -2932,4 +3024,54 @@ private static void checkMetrics(List<Span> results, boolean canRestart) { .containsEntry("canRestart", String.valueOf(canRestart)); } } + + private static JobGraph getCheckpointingSingleVertexJobGraph(final JobVertex jobVertex) { + final JobGraph jobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder() + .addJobVertices(Collections.singletonList(jobVertex)) + .setJobCheckpointingSettings( + new JobCheckpointingSettings( + new CheckpointCoordinatorConfiguration + .CheckpointCoordinatorConfigurationBuilder() + .build(), + null)) + .build(); + SchedulerTestingUtils.enableCheckpointing(jobGraph); + return jobGraph; + } + + private static AdaptiveScheduler.@NotNull StateTransitionManagerFactory + getAutoAdvanceStateTransitionManagerFactory() { + return (context, + ignoredClock, + ignoredCooldown, + ignoredResourceStabilizationTimeout, + ignoredMaxTriggerDelay) -> + TestingStateTransitionManager.withOnTriggerEventOnly( + () -> { + if (context instanceof WaitingForResources) { + context.transitionToSubsequentState(); + } + }); + } + + private static Map<OperatorID, OperatorSubtaskState> getFakeKeyedManagedStateForAllOperators( + final JobGraph jobGraph) throws IOException { + final Map<OperatorID, OperatorSubtaskState> operatorStates = new HashMap<>(); + for (final JobVertex jobVertex : jobGraph.getVertices()) { + final KeyedStateHandle keyedStateHandle = + generateKeyGroupState(jobVertex.getID(), KeyGroupRange.of(0, 0), true); + jobVertex + .getOperatorIDs() + .forEach( + operatorID -> { + operatorStates.put( + operatorID.getGeneratedOperatorID(), + OperatorSubtaskState.builder() + .setManagedKeyedState(keyedStateHandle) + .build()); + }); Review Comment: nit: in such cases simple for cycle might be slightly more readable (less nesting), but it's also a matter of preference ```suggestion for (OperatorIDPair operatorId : jobVertex.getOperatorIDs()) { operatorStates.put( operatorId.getGeneratedOperatorID(), OperatorSubtaskState.builder() .setManagedKeyedState(keyedStateHandle) .build()); } ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -2072,6 +2097,125 @@ void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable() assertThat(assignmentResult.isSuccess()).isFalse(); } + @Test + void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception { + final JobGraph jobGraph = getCheckpointingSingleVertexJobGraph(JOB_VERTEX); + final DeclarativeSlotPool slotPool = getSlotPoolWithFreeSlots(PARALLELISM); + final List<JobAllocationsInformation> capturedAllocations = new ArrayList<>(); + final boolean localRecoveryEnabled = true; + final String executionTarget = "local"; + final boolean minimalTaskManagerPreferred = false; + final SlotAllocator slotAllocator = + getArgumentCapturingDelegatingSlotAllocator( + AdaptiveSchedulerFactory.createSlotSharingSlotAllocator( + slotPool, + localRecoveryEnabled, + executionTarget, + minimalTaskManagerPreferred), + capturedAllocations); + + scheduler = + new AdaptiveSchedulerBuilder( + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .setDeclarativeSlotPool(slotPool) + .setSlotAllocator(slotAllocator) + .setStateTransitionManagerFactory( + getAutoAdvanceStateTransitionManagerFactory()) + .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)) + .build(); + + // Start scheduler + startTestInstanceInMainThread(); + + // Transition job and all subtasks to RUNNING state. + waitForJobStatusRunning(scheduler); + runInMainThread(() -> setAllExecutionsToRunning(scheduler)); + + // Trigger a checkpoint + CompletableFuture<CompletedCheckpoint> completedCheckpointFuture = + supplyInMainThread(() -> scheduler.triggerCheckpoint(CheckpointType.FULL)); + + // Verify that checkpoint was registered by scheduler. + waitForCheckpointInProgress(scheduler); + + // Acknowledge the checkpoint for all tasks with the fake state. + final Map<OperatorID, OperatorSubtaskState> operatorStates = + getFakeKeyedManagedStateForAllOperators(jobGraph); + runInMainThread(() -> acknowledgePendingCheckpoint(scheduler, 1, operatorStates)); + + // Wait for the checkpoint to complete. + final CompletedCheckpoint completedCheckpoint = + completedCheckpointFuture.get(CHECKPOINT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + // Checkpoint stats must show completed checkpoint before the job is restarted. + waitForCompletedCheckpoint(scheduler); Review Comment: ```suggestion ``` Do we need this? ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -2072,6 +2097,125 @@ void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable() assertThat(assignmentResult.isSuccess()).isFalse(); } + @Test + void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception { + final JobGraph jobGraph = getCheckpointingSingleVertexJobGraph(JOB_VERTEX); + final DeclarativeSlotPool slotPool = getSlotPoolWithFreeSlots(PARALLELISM); + final List<JobAllocationsInformation> capturedAllocations = new ArrayList<>(); + final boolean localRecoveryEnabled = true; + final String executionTarget = "local"; + final boolean minimalTaskManagerPreferred = false; + final SlotAllocator slotAllocator = + getArgumentCapturingDelegatingSlotAllocator( + AdaptiveSchedulerFactory.createSlotSharingSlotAllocator( + slotPool, + localRecoveryEnabled, + executionTarget, + minimalTaskManagerPreferred), + capturedAllocations); + + scheduler = + new AdaptiveSchedulerBuilder( + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .setDeclarativeSlotPool(slotPool) + .setSlotAllocator(slotAllocator) + .setStateTransitionManagerFactory( + getAutoAdvanceStateTransitionManagerFactory()) + .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)) + .build(); + + // Start scheduler + startTestInstanceInMainThread(); + + // Transition job and all subtasks to RUNNING state. + waitForJobStatusRunning(scheduler); + runInMainThread(() -> setAllExecutionsToRunning(scheduler)); + + // Trigger a checkpoint + CompletableFuture<CompletedCheckpoint> completedCheckpointFuture = + supplyInMainThread(() -> scheduler.triggerCheckpoint(CheckpointType.FULL)); + + // Verify that checkpoint was registered by scheduler. + waitForCheckpointInProgress(scheduler); + + // Acknowledge the checkpoint for all tasks with the fake state. + final Map<OperatorID, OperatorSubtaskState> operatorStates = + getFakeKeyedManagedStateForAllOperators(jobGraph); + runInMainThread(() -> acknowledgePendingCheckpoint(scheduler, 1, operatorStates)); + + // Wait for the checkpoint to complete. + final CompletedCheckpoint completedCheckpoint = + completedCheckpointFuture.get(CHECKPOINT_TIMEOUT_SECONDS, TimeUnit.SECONDS); Review Comment: ```suggestion final CompletedCheckpoint completedCheckpoint = completedCheckpointFuture.join(); ``` We must avoid relying on timeouts to keep CI stable. We have a generic tooling that will kill the test after a global timeout, while getting full thread dump, so you have further context about what went wrong. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -2932,4 +3024,54 @@ private static void checkMetrics(List<Span> results, boolean canRestart) { .containsEntry("canRestart", String.valueOf(canRestart)); } } + + private static JobGraph getCheckpointingSingleVertexJobGraph(final JobVertex jobVertex) { + final JobGraph jobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder() + .addJobVertices(Collections.singletonList(jobVertex)) + .setJobCheckpointingSettings( + new JobCheckpointingSettings( + new CheckpointCoordinatorConfiguration + .CheckpointCoordinatorConfigurationBuilder() + .build(), + null)) + .build(); + SchedulerTestingUtils.enableCheckpointing(jobGraph); + return jobGraph; + } + + private static AdaptiveScheduler.@NotNull StateTransitionManagerFactory + getAutoAdvanceStateTransitionManagerFactory() { Review Comment: nit: @nonnull is redundant + if method is creating a new object it's usually preferred to use `create` prefix; `get` suggests it returns something that already exists ```suggestion private static AdaptiveScheduler StateTransitionManagerFactory createAutoAdvanceStateTransitionManagerFactory() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org