This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 24932c1417d306aacebb566bbcee79d24301a183 Author: Aleksandr Iushmanov <aiushma...@confluent.io> AuthorDate: Fri Jun 13 16:45:29 2025 +0100 [FLINK-37701] Fix AdaptiveScheduler ignoring checkpoint states sizes for local recovery adjustment. (cherry picked from commit 8b6f9ce27d620d6bae03bc1f5820b5b317e6da45) --- .../scheduler/adaptive/AdaptiveScheduler.java | 18 +- .../allocator/JobAllocationsInformation.java | 13 +- .../allocator/StateLocalitySlotAssigner.java | 6 +- .../adaptive/allocator/StateSizeEstimates.java | 29 +-- .../runtime/scheduler/SchedulerTestingUtils.java | 74 ++++++-- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 210 +++++++++++++++++++-- .../runtime/scheduler/adaptive/ExecutingTest.java | 9 +- .../allocator/StateLocalitySlotAssignerTest.java | 53 ++++++ .../adaptive/allocator/TestingSlotAllocator.java | 98 +++++++++- tools/maven/checkstyle.xml | 2 +- 10 files changed, 437 insertions(+), 75 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 7f759b1c3c6..c279ff3996f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1125,6 +1125,22 @@ public class AdaptiveScheduler .isPresent(); } + private JobAllocationsInformation getJobAllocationsInformationFromGraphAndState( + @Nullable final ExecutionGraph previousExecutionGraph) { + + CompletedCheckpoint latestCompletedCheckpoint = null; + if (jobGraph.isCheckpointingEnabled()) { + latestCompletedCheckpoint = completedCheckpointStore.getLatestCheckpoint(); + } + + if (previousExecutionGraph == null || latestCompletedCheckpoint == null) { + return JobAllocationsInformation.empty(); + } else { + return JobAllocationsInformation.fromGraphAndState( + previousExecutionGraph, latestCompletedCheckpoint); + } + } + private JobSchedulingPlan determineParallelism( SlotAllocator slotAllocator, @Nullable ExecutionGraph previousExecutionGraph) throws NoResourceAvailableException { @@ -1133,7 +1149,7 @@ public class AdaptiveScheduler .determineParallelismAndCalculateAssignment( jobInformation, declarativeSlotPool.getFreeSlotTracker().getFreeSlotsInformation(), - JobAllocationsInformation.fromGraph(previousExecutionGraph)) + getJobAllocationsInformationFromGraphAndState(previousExecutionGraph)) .orElseThrow( () -> new NoResourceAvailableException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java index b6590264e6b..f7ba0c67ff9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -26,8 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -47,11 +46,11 @@ public class JobAllocationsInformation { this.vertexAllocations = vertexAllocations; } - public static JobAllocationsInformation fromGraph(@Nullable ExecutionGraph graph) { - return graph == null - ? empty() - : new JobAllocationsInformation( - calculateAllocations(graph, StateSizeEstimates.fromGraph(graph))); + public static JobAllocationsInformation fromGraphAndState( + final ExecutionGraph graph, final CompletedCheckpoint latestCheckpoint) { + return new JobAllocationsInformation( + calculateAllocations( + graph, StateSizeEstimates.fromGraphAndState(graph, latestCheckpoint))); } public List<VertexAllocationInformation> getAllocations(JobVertexID jobVertexID) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java index 458b15c1041..b9524627508 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java @@ -196,14 +196,16 @@ public class StateLocalitySlotAssigner implements SlotAssigner { private static long estimateSize( KeyGroupRange newRange, VertexAllocationInformation allocation) { KeyGroupRange oldRange = allocation.getKeyGroupRange(); + int numberOfKeyGroups = oldRange.getIntersection(newRange).getNumberOfKeyGroups(); if (allocation.stateSizeInBytes * oldRange.getNumberOfKeyGroups() == 0) { - return 0L; + // As we want to maintain same allocation for local recovery, we should give positive + // score to allocations with the same key group range even when we have no state. + return numberOfKeyGroups > 0 ? 1 : 0; } // round up to 1 long keyGroupSize = allocation.stateSizeInBytes / Math.min(allocation.stateSizeInBytes, oldRange.getNumberOfKeyGroups()); - int numberOfKeyGroups = oldRange.getIntersection(newRange).getNumberOfKeyGroups(); return numberOfKeyGroups * keyGroupSize; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java index 1a84418e5bb..ddb94e92f33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.state.KeyedStateHandle; -import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.util.HashMap; import java.util.Map; @@ -46,10 +46,6 @@ import static java.util.stream.Collectors.toMap; public class StateSizeEstimates { private final Map<ExecutionVertexID, Long> stateSizes; - public StateSizeEstimates() { - this(emptyMap()); - } - public StateSizeEstimates(Map<ExecutionVertexID, Long> stateSizes) { this.stateSizes = stateSizes; } @@ -58,22 +54,13 @@ public class StateSizeEstimates { return Optional.ofNullable(stateSizes.get(jobVertexId)); } - static StateSizeEstimates empty() { - return new StateSizeEstimates(); - } - - public static StateSizeEstimates fromGraph(@Nullable ExecutionGraph executionGraph) { - return Optional.ofNullable(executionGraph) - .flatMap(graph -> Optional.ofNullable(graph.getCheckpointCoordinator())) - .flatMap(coordinator -> Optional.ofNullable(coordinator.getCheckpointStore())) - .flatMap(store -> Optional.ofNullable(store.getLatestCheckpoint())) - .map( - cp -> - new StateSizeEstimates( - merge( - fromCompletedCheckpoint(cp), - mapVerticesToOperators(executionGraph)))) - .orElse(empty()); + public static StateSizeEstimates fromGraphAndState( + @NotNull final ExecutionGraph executionGraph, + @NotNull final CompletedCheckpoint latestCheckpoint) { + return new StateSizeEstimates( + merge( + fromCompletedCheckpoint(latestCheckpoint), + mapVerticesToOperators(executionGraph))); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java index 630a0d3917f..135e0737fa3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobWriter; @@ -27,7 +28,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; @@ -47,6 +50,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmaster.LogicalSlot; @@ -77,6 +81,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishJobVertex; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; import static org.apache.flink.runtime.util.JobVertexConnectionUtils.connectNewDataSetAsInput; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -85,6 +90,8 @@ import static org.assertj.core.api.Assertions.fail; public class SchedulerTestingUtils { private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000; + private static final long RETRY_INTERVAL_MILLIS = 10L; + private static final int RETRY_ATTEMPTS = 6000; private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300); @@ -164,7 +171,7 @@ public class SchedulerTestingUtils { } public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts( - DefaultScheduler scheduler) { + SchedulerNG scheduler) { return StreamSupport.stream( scheduler .requestJob() @@ -206,7 +213,7 @@ public class SchedulerTestingUtils { scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, executionState)); } - public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) { + public static void setAllExecutionsToRunning(final SchedulerNG scheduler) { getAllCurrentExecutionAttempts(scheduler) .forEach( (attemptId) -> { @@ -240,6 +247,22 @@ public class SchedulerTestingUtils { } } + public static void acknowledgePendingCheckpoint( + final SchedulerNG scheduler, + final int checkpointId, + final Map<OperatorID, OperatorSubtaskState> subtaskStateMap) { + getAllCurrentExecutionAttempts(scheduler) + .forEach( + (executionAttemptID) -> { + scheduler.acknowledgeCheckpoint( + scheduler.requestJob().getJobId(), + executionAttemptID, + checkpointId, + new CheckpointMetrics(), + new TaskStateSnapshot(subtaskStateMap)); + }); + } + public static CompletableFuture<CompletedCheckpoint> triggerCheckpoint( DefaultScheduler scheduler) throws Exception { final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); @@ -279,27 +302,40 @@ public class SchedulerTestingUtils { null)); } - public static CompletedCheckpoint takeCheckpoint(DefaultScheduler scheduler) throws Exception { - final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); - checkpointCoordinator.triggerCheckpoint(false); - - assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()) - .as("test setup inconsistent") - .isOne(); - final PendingCheckpoint checkpoint = - checkpointCoordinator.getPendingCheckpoints().values().iterator().next(); - final CompletableFuture<CompletedCheckpoint> future = checkpoint.getCompletionFuture(); + @SuppressWarnings("deprecation") + public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase scheduler) { + return scheduler.getCheckpointCoordinator(); + } - acknowledgePendingCheckpoint(scheduler, checkpoint.getCheckpointID()); + public static void waitForJobStatusRunning(final SchedulerNG scheduler) throws Exception { + waitUntilCondition( + () -> scheduler.requestJobStatus() == JobStatus.RUNNING, + RETRY_INTERVAL_MILLIS, + RETRY_ATTEMPTS); + } - CompletedCheckpoint completed = future.getNow(null); - assertThat(completed).withFailMessage("checkpoint not complete").isNotNull(); - return completed; + public static void waitForCheckpointInProgress(final SchedulerNG scheduler) throws Exception { + waitUntilCondition( + () -> + scheduler + .requestCheckpointStats() + .getCounts() + .getNumberOfInProgressCheckpoints() + > 0, + RETRY_INTERVAL_MILLIS, + RETRY_ATTEMPTS); } - @SuppressWarnings("deprecation") - public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase scheduler) { - return scheduler.getCheckpointCoordinator(); + public static void waitForCompletedCheckpoint(final SchedulerNG scheduler) throws Exception { + waitUntilCondition( + () -> + scheduler + .requestCheckpointStats() + .getCounts() + .getNumberOfCompletedCheckpoints() + > 0, + RETRY_INTERVAL_MILLIS, + RETRY_ATTEMPTS); } private static ExecutionJobVertex getJobVertex( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 54695b78b15..dc11dc8bf25 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -24,19 +24,23 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.configuration.TraceOptions; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.core.failure.TestingFailureEnricher; import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointStatsListener; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.NoOpCheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter; @@ -55,7 +59,10 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest; import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.failover.NoRestartBackoffTimeStrategy; @@ -100,13 +107,17 @@ import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testutils.CommonTestUtils; @@ -139,8 +150,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -161,16 +174,24 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.singleNoOpJobGraph; import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph; import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements; import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setAllExecutionsToRunning; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCheckpointInProgress; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCompletedCheckpoint; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForJobStatusRunning; +import static org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator.getArgumentCapturingDelegatingSlotAllocator; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for the {@link AdaptiveScheduler}. */ public class AdaptiveSchedulerTest { @@ -189,6 +210,8 @@ public class AdaptiveSchedulerTest { private static final TestExecutorExtension<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = new TestExecutorExtension<>(Executors::newSingleThreadScheduledExecutor); + public static final int CHECKPOINT_TIMEOUT_SECONDS = 10; + private final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread()); @@ -257,6 +280,10 @@ public class AdaptiveSchedulerTest { CompletableFuture.runAsync(callback, singleThreadMainThreadExecutor).join(); } + private <T> T supplyInMainThread(Supplier<T> supplier) throws Exception { + return CompletableFuture.supplyAsync(supplier, singleThreadMainThreadExecutor).get(); + } + @Test void testInitialState() throws Exception { scheduler = @@ -2057,10 +2084,7 @@ public class AdaptiveSchedulerTest { void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable() throws Exception { - final TestingSlotAllocator slotAllocator = - TestingSlotAllocator.newBuilder() - .setTryReserveResourcesFunction(ignored -> Optional.empty()) - .build(); + final TestingSlotAllocator slotAllocator = TestingSlotAllocator.newBuilder().build(); scheduler = new AdaptiveSchedulerBuilder( @@ -2078,6 +2102,126 @@ public class AdaptiveSchedulerTest { assertThat(assignmentResult.isSuccess()).isFalse(); } + @Test + void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception { + final JobGraph jobGraph = createJobGraphWithCheckpointing(JOB_VERTEX); + final DeclarativeSlotPool slotPool = getSlotPoolWithFreeSlots(PARALLELISM); + final List<JobAllocationsInformation> capturedAllocations = new ArrayList<>(); + final boolean localRecoveryEnabled = true; + final String executionTarget = "local"; + final boolean minimalTaskManagerPreferred = false; + final SlotAllocator slotAllocator = + getArgumentCapturingDelegatingSlotAllocator( + AdaptiveSchedulerFactory.createSlotSharingSlotAllocator( + slotPool, + localRecoveryEnabled, + executionTarget, + minimalTaskManagerPreferred), + capturedAllocations); + + scheduler = + new AdaptiveSchedulerBuilder( + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .setDeclarativeSlotPool(slotPool) + .setSlotAllocator(slotAllocator) + .setStateTransitionManagerFactory( + createAutoAdvanceStateTransitionManagerFactory()) + .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)) + .build(); + + // Start scheduler + startTestInstanceInMainThread(); + + // Transition job and all subtasks to RUNNING state. + waitForJobStatusRunning(scheduler); + runInMainThread(() -> setAllExecutionsToRunning(scheduler)); + + // Trigger a checkpoint + CompletableFuture<CompletedCheckpoint> completedCheckpointFuture = + supplyInMainThread(() -> scheduler.triggerCheckpoint(CheckpointType.FULL)); + + // Verify that checkpoint was registered by scheduler. Required to prevent race condition + // when checkpoint is acknowledged before start. + waitForCheckpointInProgress(scheduler); + + // Acknowledge the checkpoint for all tasks with the fake state. + final Map<OperatorID, OperatorSubtaskState> operatorStates = + generateFakeKeyedManagedStateForAllOperators(jobGraph); + runInMainThread(() -> acknowledgePendingCheckpoint(scheduler, 1, operatorStates)); + + // Wait for the checkpoint to complete. + final CompletedCheckpoint completedCheckpoint = completedCheckpointFuture.join(); + + // completedCheckpointStore.getLatestCheckpoint() can return null if called immediately + // after the checkpoint is completed. + waitForCompletedCheckpoint(scheduler); + + // Fail early if the checkpoint is null. + assertThat(completedCheckpoint).withFailMessage("Checkpoint shouldn't be null").isNotNull(); + + // Emulating new graph creation call on job recovery to ensure that the state is considered + // for new allocations. + final List<ExecutionAttemptID> executionAttemptIds = + supplyInMainThread( + () -> { + final Optional<ExecutionGraph> maybeExecutionGraph = + scheduler + .getState() + .as(StateWithExecutionGraph.class) + .map(StateWithExecutionGraph::getExecutionGraph); + assertThat(maybeExecutionGraph).isNotEmpty(); + final ExecutionVertex[] taskVertices = + Objects.requireNonNull( + maybeExecutionGraph + .get() + .getJobVertex(JOB_VERTEX.getID())) + .getTaskVertices(); + return Arrays.stream(taskVertices) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getAttemptId) + .collect(Collectors.toList()); + }); + + assertThat(executionAttemptIds).hasSize(PARALLELISM); + + runInMainThread( + () -> { + // fail one of the vertices + scheduler.updateTaskExecutionState( + new TaskExecutionState( + executionAttemptIds.get(0), + ExecutionState.FAILED, + new Exception("Test exception for local recovery"))); + }); + + runInMainThread( + () -> { + // cancel remaining vertices + for (int idx = 1; idx < executionAttemptIds.size(); idx++) { + scheduler.updateTaskExecutionState( + new TaskExecutionState( + executionAttemptIds.get(idx), ExecutionState.CANCELED)); + } + }); + + waitForJobStatusRunning(scheduler); + + // First allocation during the job start + second allocation after job restart. + assertThat(capturedAllocations).hasSize(2); + // Fist allocation won't use state data. + assertTrue(capturedAllocations.get(0).isEmpty()); + // Second allocation should use data from latest checkpoint. + assertThat( + capturedAllocations + .get(1) + .getAllocations(JOB_VERTEX.getID()) + .get(0) + .stateSizeInBytes) + .isGreaterThan(0); + } + @Test void testComputeVertexParallelismStoreForExecutionInReactiveMode() { JobVertex v1 = createNoOpVertex("v1", 1, 50); @@ -2519,17 +2663,7 @@ public class AdaptiveSchedulerTest { JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_MAX_CHECKPOINT_FAILURES, onFailedCheckpointCount); - final JobGraph jobGraph = - JobGraphBuilder.newStreamingJobGraphBuilder() - .addJobVertices(Collections.singletonList(JOB_VERTEX)) - .setJobCheckpointingSettings( - new JobCheckpointingSettings( - new CheckpointCoordinatorConfiguration - .CheckpointCoordinatorConfigurationBuilder() - .build(), - null)) - .build(); - SchedulerTestingUtils.enableCheckpointing(jobGraph); + final JobGraph jobGraph = createJobGraphWithCheckpointing(JOB_VERTEX); final DeclarativeSlotPool slotPool = getSlotPoolWithFreeSlots(parallelism); final AtomicInteger eventCounter = new AtomicInteger(); @@ -2938,4 +3072,50 @@ public class AdaptiveSchedulerTest { .containsEntry("canRestart", String.valueOf(canRestart)); } } + + private static JobGraph createJobGraphWithCheckpointing(final JobVertex... jobVertex) { + final JobGraph jobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder() + .addJobVertices(Arrays.asList(jobVertex)) + .build(); + SchedulerTestingUtils.enableCheckpointing( + jobGraph, null, null, Duration.ofHours(1).toMillis(), true); + return jobGraph; + } + + private static AdaptiveScheduler.StateTransitionManagerFactory + createAutoAdvanceStateTransitionManagerFactory() { + return (context, + ignoredClock, + ignoredCooldown, + ignoredResourceStabilizationTimeout, + ignoredMaxTriggerDelay) -> + TestingStateTransitionManager.withOnTriggerEventOnly( + () -> { + if (context instanceof WaitingForResources) { + context.transitionToSubsequentState(); + } + }); + } + + private static Map<OperatorID, OperatorSubtaskState> + generateFakeKeyedManagedStateForAllOperators(final JobGraph jobGraph) + throws IOException { + final Map<OperatorID, OperatorSubtaskState> operatorStates = new HashMap<>(); + for (final JobVertex jobVertex : jobGraph.getVertices()) { + final KeyedStateHandle keyedStateHandle = + generateKeyGroupState( + jobVertex.getID(), + KeyGroupRange.of(0, jobGraph.getMaximumParallelism() - 1), + false); + for (OperatorIDPair operatorId : jobVertex.getOperatorIDs()) { + operatorStates.put( + operatorId.getGeneratedOperatorID(), + OperatorSubtaskState.builder() + .setManagedKeyedState(keyedStateHandle) + .build()); + } + } + return operatorStates; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index 509ea292018..862722407de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -1074,9 +1074,16 @@ class ExecutingTest { MockExecutionJobVertex( Function<ExecutionJobVertex, ExecutionVertex> executionVertexSupplier) throws JobException { + this(executionVertexSupplier, new JobVertex("test")); + } + + MockExecutionJobVertex( + final Function<ExecutionJobVertex, ExecutionVertex> executionVertexSupplier, + final JobVertex jobVertex) + throws JobException { super( new MockInternalExecutionGraphAccessor(), - new JobVertex("test"), + jobVertex, new DefaultVertexParallelismInfo(1, 1, max -> Optional.empty()), new CoordinatorStoreImpl(), UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java index 445eedc82ea..9be8dc41ec7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation; import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation.VertexInformation; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -32,6 +33,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -39,6 +41,7 @@ import java.util.stream.IntStream; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasSize; @@ -74,6 +77,56 @@ class StateLocalitySlotAssignerTest { verifyAssignments(assignments, newParallelism, allocationWith200bytes); } + @Test + // In case of local recovery, we want to preserve slot allocations even if there is no + // keyed managed state available. + public void testSlotsPreservationWithNoStateSameParallelism() { + final int parallelism = 2; + final VertexInformation vertex = createVertex(parallelism); + final AllocationID allocationID1 = new AllocationID(); + final AllocationID allocationID2 = new AllocationID(); + + final List<VertexAllocationInformation> previousAllocations = + Arrays.asList( + new VertexAllocationInformation( + allocationID1, vertex.getJobVertexID(), KeyGroupRange.of(0, 63), 0), + new VertexAllocationInformation( + allocationID2, + vertex.getJobVertexID(), + KeyGroupRange.of(64, 127), + 0)); + + final Collection<SlotAssignment> assignments = + assign( + vertex, + // Providing allocation IDs in reverse order to check that assigner fixes + // the order based on previous allocations. + Arrays.asList(allocationID2, allocationID1), + previousAllocations); + + // Extract allocation IDs from assignments sorted by subtask index. + final List<AllocationID> subtaskOrderedNewAllocations = + assignments.stream() + .sorted( + Comparator.comparingInt( + assignment -> + assignment + .getTargetAs( + SlotSharingSlotAllocator + .ExecutionSlotSharingGroup + .class) + .getContainedExecutionVertices() + .stream() + .mapToInt( + ExecutionVertexID::getSubtaskIndex) + .findAny() + .orElseThrow())) + .map(assignment -> assignment.getSlotInfo().getAllocationId()) + .collect(Collectors.toList()); + + assertThat(subtaskOrderedNewAllocations).containsExactly(allocationID1, allocationID2); + } + @Test void testSlotsAreNotWasted() { VertexInformation vertex = createVertex(2); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java index 0fd638c43bf..b0897d54f53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java @@ -21,9 +21,12 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan; import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.util.function.TriFunction; import java.util.Collection; +import java.util.List; import java.util.Optional; +import java.util.function.BiFunction; import java.util.function.Function; /** Testing implementation of {@link SlotAllocator}. */ @@ -32,14 +35,39 @@ public class TestingSlotAllocator implements SlotAllocator { private final Function<Iterable<JobInformation.VertexInformation>, ResourceCounter> calculateRequiredSlotsFunction; - private final Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction; + private final Function<JobSchedulingPlan, Optional<ReservedSlots>> tryReserveResourcesFunction; + + private final BiFunction< + JobInformation, Collection<? extends SlotInfo>, Optional<VertexParallelism>> + determineParallelismFunction; + + private final TriFunction< + JobInformation, + Collection<? extends SlotInfo>, + JobAllocationsInformation, + Optional<JobSchedulingPlan>> + determineParallelismAndCalculateAssignmentFunction; private TestingSlotAllocator( Function<Iterable<JobInformation.VertexInformation>, ResourceCounter> calculateRequiredSlotsFunction, - Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction) { + Function<JobSchedulingPlan, Optional<ReservedSlots>> tryReserveResourcesFunction, + final BiFunction< + JobInformation, + Collection<? extends SlotInfo>, + Optional<VertexParallelism>> + determineParallelismFunction, + final TriFunction< + JobInformation, + Collection<? extends SlotInfo>, + JobAllocationsInformation, + Optional<JobSchedulingPlan>> + determineParallelismAndCalculateAssignmentFunction) { this.calculateRequiredSlotsFunction = calculateRequiredSlotsFunction; this.tryReserveResourcesFunction = tryReserveResourcesFunction; + this.determineParallelismFunction = determineParallelismFunction; + this.determineParallelismAndCalculateAssignmentFunction = + determineParallelismAndCalculateAssignmentFunction; } @Override @@ -51,7 +79,7 @@ public class TestingSlotAllocator implements SlotAllocator { @Override public Optional<VertexParallelism> determineParallelism( JobInformation jobInformation, Collection<? extends SlotInfo> slots) { - return Optional.empty(); + return determineParallelismFunction.apply(jobInformation, slots); } @Override @@ -59,12 +87,13 @@ public class TestingSlotAllocator implements SlotAllocator { JobInformation jobInformation, Collection<? extends SlotInfo> slots, JobAllocationsInformation jobAllocationsInformation) { - return Optional.empty(); + return determineParallelismAndCalculateAssignmentFunction.apply( + jobInformation, slots, jobAllocationsInformation); } @Override public Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan jobSchedulingPlan) { - return tryReserveResourcesFunction.apply(jobSchedulingPlan.getVertexParallelism()); + return tryReserveResourcesFunction.apply(jobSchedulingPlan); } public static Builder newBuilder() { @@ -75,9 +104,21 @@ public class TestingSlotAllocator implements SlotAllocator { public static final class Builder { private Function<Iterable<JobInformation.VertexInformation>, ResourceCounter> calculateRequiredSlotsFunction = ignored -> ResourceCounter.empty(); - private Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction = + private Function<JobSchedulingPlan, Optional<ReservedSlots>> tryReserveResourcesFunction = ignored -> Optional.empty(); + private BiFunction< + JobInformation, Collection<? extends SlotInfo>, Optional<VertexParallelism>> + determineSlotsFunction = (jobInformation, slots) -> Optional.empty(); + + private TriFunction< + JobInformation, + Collection<? extends SlotInfo>, + JobAllocationsInformation, + Optional<JobSchedulingPlan>> + determineParallelismAndCalculateAssignmentFunction = + (jobInformation, slots, jobAllocationsInformation) -> Optional.empty(); + public Builder setCalculateRequiredSlotsFunction( Function<Iterable<JobInformation.VertexInformation>, ResourceCounter> calculateRequiredSlotsFunction) { @@ -85,15 +126,56 @@ public class TestingSlotAllocator implements SlotAllocator { return this; } + public Builder setDetermineParallelismFunction( + BiFunction< + JobInformation, + Collection<? extends SlotInfo>, + Optional<VertexParallelism>> + determineParallelismFunction) { + this.determineSlotsFunction = determineParallelismFunction; + return this; + } + + public Builder setDetermineParallelismAndCalculateAssignmentFunction( + TriFunction< + JobInformation, + Collection<? extends SlotInfo>, + JobAllocationsInformation, + Optional<JobSchedulingPlan>> + determineParallelismAndCalculateAssignmentFunction) { + this.determineParallelismAndCalculateAssignmentFunction = + determineParallelismAndCalculateAssignmentFunction; + return this; + } + public Builder setTryReserveResourcesFunction( - Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction) { + Function<JobSchedulingPlan, Optional<ReservedSlots>> tryReserveResourcesFunction) { this.tryReserveResourcesFunction = tryReserveResourcesFunction; return this; } public TestingSlotAllocator build() { return new TestingSlotAllocator( - calculateRequiredSlotsFunction, tryReserveResourcesFunction); + calculateRequiredSlotsFunction, + tryReserveResourcesFunction, + determineSlotsFunction, + determineParallelismAndCalculateAssignmentFunction); } } + + public static TestingSlotAllocator getArgumentCapturingDelegatingSlotAllocator( + final SlotAllocator slotAllocator, + final List<JobAllocationsInformation> capturedAllocations) { + return TestingSlotAllocator.newBuilder() + .setCalculateRequiredSlotsFunction(slotAllocator::calculateRequiredSlots) + .setTryReserveResourcesFunction(slotAllocator::tryReserveResources) + .setDetermineParallelismFunction(slotAllocator::determineParallelism) + .setDetermineParallelismAndCalculateAssignmentFunction( + (jobInformation, slotInfos, jobAllocationsInformation) -> { + capturedAllocations.add(jobAllocationsInformation); + return slotAllocator.determineParallelismAndCalculateAssignment( + jobInformation, slotInfos, jobAllocationsInformation); + }) + .build(); + } } diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml index 03b13fbc676..48823bf62d5 100644 --- a/tools/maven/checkstyle.xml +++ b/tools/maven/checkstyle.xml @@ -61,7 +61,7 @@ This file is based on the checkstyle file of Apache Beam. --> <module name="FileLength"> - <property name="max" value="3100"/> + <property name="max" value="3150"/> </module> <!-- All Java AST specific tests live under TreeWalker module. -->