This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 24932c1417d306aacebb566bbcee79d24301a183
Author: Aleksandr Iushmanov <aiushma...@confluent.io>
AuthorDate: Fri Jun 13 16:45:29 2025 +0100

    [FLINK-37701] Fix AdaptiveScheduler ignoring checkpoint states sizes for 
local recovery adjustment.
    
    (cherry picked from commit 8b6f9ce27d620d6bae03bc1f5820b5b317e6da45)
---
 .../scheduler/adaptive/AdaptiveScheduler.java      |  18 +-
 .../allocator/JobAllocationsInformation.java       |  13 +-
 .../allocator/StateLocalitySlotAssigner.java       |   6 +-
 .../adaptive/allocator/StateSizeEstimates.java     |  29 +--
 .../runtime/scheduler/SchedulerTestingUtils.java   |  74 ++++++--
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 210 +++++++++++++++++++--
 .../runtime/scheduler/adaptive/ExecutingTest.java  |   9 +-
 .../allocator/StateLocalitySlotAssignerTest.java   |  53 ++++++
 .../adaptive/allocator/TestingSlotAllocator.java   |  98 +++++++++-
 tools/maven/checkstyle.xml                         |   2 +-
 10 files changed, 437 insertions(+), 75 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 7f759b1c3c6..c279ff3996f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1125,6 +1125,22 @@ public class AdaptiveScheduler
                 .isPresent();
     }
 
+    private JobAllocationsInformation 
getJobAllocationsInformationFromGraphAndState(
+            @Nullable final ExecutionGraph previousExecutionGraph) {
+
+        CompletedCheckpoint latestCompletedCheckpoint = null;
+        if (jobGraph.isCheckpointingEnabled()) {
+            latestCompletedCheckpoint = 
completedCheckpointStore.getLatestCheckpoint();
+        }
+
+        if (previousExecutionGraph == null || latestCompletedCheckpoint == 
null) {
+            return JobAllocationsInformation.empty();
+        } else {
+            return JobAllocationsInformation.fromGraphAndState(
+                    previousExecutionGraph, latestCompletedCheckpoint);
+        }
+    }
+
     private JobSchedulingPlan determineParallelism(
             SlotAllocator slotAllocator, @Nullable ExecutionGraph 
previousExecutionGraph)
             throws NoResourceAvailableException {
@@ -1133,7 +1149,7 @@ public class AdaptiveScheduler
                 .determineParallelismAndCalculateAssignment(
                         jobInformation,
                         
declarativeSlotPool.getFreeSlotTracker().getFreeSlotsInformation(),
-                        
JobAllocationsInformation.fromGraph(previousExecutionGraph))
+                        
getJobAllocationsInformationFromGraphAndState(previousExecutionGraph))
                 .orElseThrow(
                         () ->
                                 new NoResourceAvailableException(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java
index b6590264e6b..f7ba0c67ff9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -26,8 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -47,11 +46,11 @@ public class JobAllocationsInformation {
         this.vertexAllocations = vertexAllocations;
     }
 
-    public static JobAllocationsInformation fromGraph(@Nullable ExecutionGraph 
graph) {
-        return graph == null
-                ? empty()
-                : new JobAllocationsInformation(
-                        calculateAllocations(graph, 
StateSizeEstimates.fromGraph(graph)));
+    public static JobAllocationsInformation fromGraphAndState(
+            final ExecutionGraph graph, final CompletedCheckpoint 
latestCheckpoint) {
+        return new JobAllocationsInformation(
+                calculateAllocations(
+                        graph, StateSizeEstimates.fromGraphAndState(graph, 
latestCheckpoint)));
     }
 
     public List<VertexAllocationInformation> getAllocations(JobVertexID 
jobVertexID) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
index 458b15c1041..b9524627508 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
@@ -196,14 +196,16 @@ public class StateLocalitySlotAssigner implements 
SlotAssigner {
     private static long estimateSize(
             KeyGroupRange newRange, VertexAllocationInformation allocation) {
         KeyGroupRange oldRange = allocation.getKeyGroupRange();
+        int numberOfKeyGroups = 
oldRange.getIntersection(newRange).getNumberOfKeyGroups();
         if (allocation.stateSizeInBytes * oldRange.getNumberOfKeyGroups() == 
0) {
-            return 0L;
+            // As we want to maintain same allocation for local recovery, we 
should give positive
+            // score to allocations with the same key group range even when we 
have no state.
+            return numberOfKeyGroups > 0 ? 1 : 0;
         }
         // round up to 1
         long keyGroupSize =
                 allocation.stateSizeInBytes
                         / Math.min(allocation.stateSizeInBytes, 
oldRange.getNumberOfKeyGroups());
-        int numberOfKeyGroups = 
oldRange.getIntersection(newRange).getNumberOfKeyGroups();
         return numberOfKeyGroups * keyGroupSize;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java
index 1a84418e5bb..ddb94e92f33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 
-import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -46,10 +46,6 @@ import static java.util.stream.Collectors.toMap;
 public class StateSizeEstimates {
     private final Map<ExecutionVertexID, Long> stateSizes;
 
-    public StateSizeEstimates() {
-        this(emptyMap());
-    }
-
     public StateSizeEstimates(Map<ExecutionVertexID, Long> stateSizes) {
         this.stateSizes = stateSizes;
     }
@@ -58,22 +54,13 @@ public class StateSizeEstimates {
         return Optional.ofNullable(stateSizes.get(jobVertexId));
     }
 
-    static StateSizeEstimates empty() {
-        return new StateSizeEstimates();
-    }
-
-    public static StateSizeEstimates fromGraph(@Nullable ExecutionGraph 
executionGraph) {
-        return Optional.ofNullable(executionGraph)
-                .flatMap(graph -> 
Optional.ofNullable(graph.getCheckpointCoordinator()))
-                .flatMap(coordinator -> 
Optional.ofNullable(coordinator.getCheckpointStore()))
-                .flatMap(store -> 
Optional.ofNullable(store.getLatestCheckpoint()))
-                .map(
-                        cp ->
-                                new StateSizeEstimates(
-                                        merge(
-                                                fromCompletedCheckpoint(cp),
-                                                
mapVerticesToOperators(executionGraph))))
-                .orElse(empty());
+    public static StateSizeEstimates fromGraphAndState(
+            @NotNull final ExecutionGraph executionGraph,
+            @NotNull final CompletedCheckpoint latestCheckpoint) {
+        return new StateSizeEstimates(
+                merge(
+                        fromCompletedCheckpoint(latestCheckpoint),
+                        mapVerticesToOperators(executionGraph)));
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 630a0d3917f..135e0737fa3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -27,7 +28,9 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -47,6 +50,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -77,6 +81,7 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishJobVertex;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
 import static 
org.apache.flink.runtime.util.JobVertexConnectionUtils.connectNewDataSetAsInput;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
@@ -85,6 +90,8 @@ import static org.assertj.core.api.Assertions.fail;
 public class SchedulerTestingUtils {
 
     private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000;
+    private static final long RETRY_INTERVAL_MILLIS = 10L;
+    private static final int RETRY_ATTEMPTS = 6000;
 
     private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
 
@@ -164,7 +171,7 @@ public class SchedulerTestingUtils {
     }
 
     public static Collection<ExecutionAttemptID> 
getAllCurrentExecutionAttempts(
-            DefaultScheduler scheduler) {
+            SchedulerNG scheduler) {
         return StreamSupport.stream(
                         scheduler
                                 .requestJob()
@@ -206,7 +213,7 @@ public class SchedulerTestingUtils {
         scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, 
executionState));
     }
 
-    public static void setAllExecutionsToRunning(final DefaultScheduler 
scheduler) {
+    public static void setAllExecutionsToRunning(final SchedulerNG scheduler) {
         getAllCurrentExecutionAttempts(scheduler)
                 .forEach(
                         (attemptId) -> {
@@ -240,6 +247,22 @@ public class SchedulerTestingUtils {
         }
     }
 
+    public static void acknowledgePendingCheckpoint(
+            final SchedulerNG scheduler,
+            final int checkpointId,
+            final Map<OperatorID, OperatorSubtaskState> subtaskStateMap) {
+        getAllCurrentExecutionAttempts(scheduler)
+                .forEach(
+                        (executionAttemptID) -> {
+                            scheduler.acknowledgeCheckpoint(
+                                    scheduler.requestJob().getJobId(),
+                                    executionAttemptID,
+                                    checkpointId,
+                                    new CheckpointMetrics(),
+                                    new TaskStateSnapshot(subtaskStateMap));
+                        });
+    }
+
     public static CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
             DefaultScheduler scheduler) throws Exception {
         final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
@@ -279,27 +302,40 @@ public class SchedulerTestingUtils {
                                         null));
     }
 
-    public static CompletedCheckpoint takeCheckpoint(DefaultScheduler 
scheduler) throws Exception {
-        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
-        checkpointCoordinator.triggerCheckpoint(false);
-
-        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints())
-                .as("test setup inconsistent")
-                .isOne();
-        final PendingCheckpoint checkpoint =
-                
checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
-        final CompletableFuture<CompletedCheckpoint> future = 
checkpoint.getCompletionFuture();
+    @SuppressWarnings("deprecation")
+    public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase 
scheduler) {
+        return scheduler.getCheckpointCoordinator();
+    }
 
-        acknowledgePendingCheckpoint(scheduler, checkpoint.getCheckpointID());
+    public static void waitForJobStatusRunning(final SchedulerNG scheduler) 
throws Exception {
+        waitUntilCondition(
+                () -> scheduler.requestJobStatus() == JobStatus.RUNNING,
+                RETRY_INTERVAL_MILLIS,
+                RETRY_ATTEMPTS);
+    }
 
-        CompletedCheckpoint completed = future.getNow(null);
-        assertThat(completed).withFailMessage("checkpoint not 
complete").isNotNull();
-        return completed;
+    public static void waitForCheckpointInProgress(final SchedulerNG 
scheduler) throws Exception {
+        waitUntilCondition(
+                () ->
+                        scheduler
+                                        .requestCheckpointStats()
+                                        .getCounts()
+                                        .getNumberOfInProgressCheckpoints()
+                                > 0,
+                RETRY_INTERVAL_MILLIS,
+                RETRY_ATTEMPTS);
     }
 
-    @SuppressWarnings("deprecation")
-    public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase 
scheduler) {
-        return scheduler.getCheckpointCoordinator();
+    public static void waitForCompletedCheckpoint(final SchedulerNG scheduler) 
throws Exception {
+        waitUntilCondition(
+                () ->
+                        scheduler
+                                        .requestCheckpointStats()
+                                        .getCounts()
+                                        .getNumberOfCompletedCheckpoints()
+                                > 0,
+                RETRY_INTERVAL_MILLIS,
+                RETRY_ATTEMPTS);
     }
 
     private static ExecutionJobVertex getJobVertex(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 54695b78b15..dc11dc8bf25 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -24,19 +24,23 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.configuration.TraceOptions;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.failure.TestingFailureEnricher;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.NoOpCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
@@ -55,7 +59,10 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.NoRestartBackoffTimeStrategy;
@@ -100,13 +107,17 @@ import 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -139,8 +150,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -161,16 +174,24 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static 
org.apache.flink.runtime.jobgraph.JobGraphTestUtils.singleNoOpJobGraph;
 import static 
org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setAllExecutionsToRunning;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCheckpointInProgress;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCompletedCheckpoint;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForJobStatusRunning;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator.getArgumentCapturingDelegatingSlotAllocator;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for the {@link AdaptiveScheduler}. */
 public class AdaptiveSchedulerTest {
@@ -189,6 +210,8 @@ public class AdaptiveSchedulerTest {
     private static final TestExecutorExtension<ScheduledExecutorService> 
TEST_EXECUTOR_RESOURCE =
             new 
TestExecutorExtension<>(Executors::newSingleThreadScheduledExecutor);
 
+    public static final int CHECKPOINT_TIMEOUT_SECONDS = 10;
+
     private final ManuallyTriggeredComponentMainThreadExecutor 
mainThreadExecutor =
             new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
 
@@ -257,6 +280,10 @@ public class AdaptiveSchedulerTest {
         CompletableFuture.runAsync(callback, 
singleThreadMainThreadExecutor).join();
     }
 
+    private <T> T supplyInMainThread(Supplier<T> supplier) throws Exception {
+        return CompletableFuture.supplyAsync(supplier, 
singleThreadMainThreadExecutor).get();
+    }
+
     @Test
     void testInitialState() throws Exception {
         scheduler =
@@ -2057,10 +2084,7 @@ public class AdaptiveSchedulerTest {
     void 
testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable()
             throws Exception {
 
-        final TestingSlotAllocator slotAllocator =
-                TestingSlotAllocator.newBuilder()
-                        .setTryReserveResourcesFunction(ignored -> 
Optional.empty())
-                        .build();
+        final TestingSlotAllocator slotAllocator = 
TestingSlotAllocator.newBuilder().build();
 
         scheduler =
                 new AdaptiveSchedulerBuilder(
@@ -2078,6 +2102,126 @@ public class AdaptiveSchedulerTest {
         assertThat(assignmentResult.isSuccess()).isFalse();
     }
 
+    @Test
+    void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception 
{
+        final JobGraph jobGraph = createJobGraphWithCheckpointing(JOB_VERTEX);
+        final DeclarativeSlotPool slotPool = 
getSlotPoolWithFreeSlots(PARALLELISM);
+        final List<JobAllocationsInformation> capturedAllocations = new 
ArrayList<>();
+        final boolean localRecoveryEnabled = true;
+        final String executionTarget = "local";
+        final boolean minimalTaskManagerPreferred = false;
+        final SlotAllocator slotAllocator =
+                getArgumentCapturingDelegatingSlotAllocator(
+                        
AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
+                                slotPool,
+                                localRecoveryEnabled,
+                                executionTarget,
+                                minimalTaskManagerPreferred),
+                        capturedAllocations);
+
+        scheduler =
+                new AdaptiveSchedulerBuilder(
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
+                        .setDeclarativeSlotPool(slotPool)
+                        .setSlotAllocator(slotAllocator)
+                        .setStateTransitionManagerFactory(
+                                
createAutoAdvanceStateTransitionManagerFactory())
+                        .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, 0L))
+                        .build();
+
+        // Start scheduler
+        startTestInstanceInMainThread();
+
+        // Transition job and all subtasks to RUNNING state.
+        waitForJobStatusRunning(scheduler);
+        runInMainThread(() -> setAllExecutionsToRunning(scheduler));
+
+        // Trigger a checkpoint
+        CompletableFuture<CompletedCheckpoint> completedCheckpointFuture =
+                supplyInMainThread(() -> 
scheduler.triggerCheckpoint(CheckpointType.FULL));
+
+        // Verify that checkpoint was registered by scheduler. Required to 
prevent race condition
+        // when checkpoint is acknowledged before start.
+        waitForCheckpointInProgress(scheduler);
+
+        // Acknowledge the checkpoint for all tasks with the fake state.
+        final Map<OperatorID, OperatorSubtaskState> operatorStates =
+                generateFakeKeyedManagedStateForAllOperators(jobGraph);
+        runInMainThread(() -> acknowledgePendingCheckpoint(scheduler, 1, 
operatorStates));
+
+        // Wait for the checkpoint to complete.
+        final CompletedCheckpoint completedCheckpoint = 
completedCheckpointFuture.join();
+
+        // completedCheckpointStore.getLatestCheckpoint() can return null if 
called immediately
+        // after the checkpoint is completed.
+        waitForCompletedCheckpoint(scheduler);
+
+        // Fail early if the checkpoint is null.
+        assertThat(completedCheckpoint).withFailMessage("Checkpoint shouldn't 
be null").isNotNull();
+
+        // Emulating new graph creation call on job recovery to ensure that 
the state is considered
+        // for new allocations.
+        final List<ExecutionAttemptID> executionAttemptIds =
+                supplyInMainThread(
+                        () -> {
+                            final Optional<ExecutionGraph> maybeExecutionGraph 
=
+                                    scheduler
+                                            .getState()
+                                            .as(StateWithExecutionGraph.class)
+                                            
.map(StateWithExecutionGraph::getExecutionGraph);
+                            assertThat(maybeExecutionGraph).isNotEmpty();
+                            final ExecutionVertex[] taskVertices =
+                                    Objects.requireNonNull(
+                                                    maybeExecutionGraph
+                                                            .get()
+                                                            
.getJobVertex(JOB_VERTEX.getID()))
+                                            .getTaskVertices();
+                            return Arrays.stream(taskVertices)
+                                    
.map(ExecutionVertex::getCurrentExecutionAttempt)
+                                    .map(Execution::getAttemptId)
+                                    .collect(Collectors.toList());
+                        });
+
+        assertThat(executionAttemptIds).hasSize(PARALLELISM);
+
+        runInMainThread(
+                () -> {
+                    // fail one of the vertices
+                    scheduler.updateTaskExecutionState(
+                            new TaskExecutionState(
+                                    executionAttemptIds.get(0),
+                                    ExecutionState.FAILED,
+                                    new Exception("Test exception for local 
recovery")));
+                });
+
+        runInMainThread(
+                () -> {
+                    // cancel remaining vertices
+                    for (int idx = 1; idx < executionAttemptIds.size(); idx++) 
{
+                        scheduler.updateTaskExecutionState(
+                                new TaskExecutionState(
+                                        executionAttemptIds.get(idx), 
ExecutionState.CANCELED));
+                    }
+                });
+
+        waitForJobStatusRunning(scheduler);
+
+        // First allocation during the job start + second allocation after job 
restart.
+        assertThat(capturedAllocations).hasSize(2);
+        // Fist allocation won't use state data.
+        assertTrue(capturedAllocations.get(0).isEmpty());
+        // Second allocation should use data from latest checkpoint.
+        assertThat(
+                        capturedAllocations
+                                .get(1)
+                                .getAllocations(JOB_VERTEX.getID())
+                                .get(0)
+                                .stateSizeInBytes)
+                .isGreaterThan(0);
+    }
+
     @Test
     void testComputeVertexParallelismStoreForExecutionInReactiveMode() {
         JobVertex v1 = createNoOpVertex("v1", 1, 50);
@@ -2519,17 +2663,7 @@ public class AdaptiveSchedulerTest {
                 
JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_MAX_CHECKPOINT_FAILURES,
                 onFailedCheckpointCount);
 
-        final JobGraph jobGraph =
-                JobGraphBuilder.newStreamingJobGraphBuilder()
-                        .addJobVertices(Collections.singletonList(JOB_VERTEX))
-                        .setJobCheckpointingSettings(
-                                new JobCheckpointingSettings(
-                                        new CheckpointCoordinatorConfiguration
-                                                        
.CheckpointCoordinatorConfigurationBuilder()
-                                                .build(),
-                                        null))
-                        .build();
-        SchedulerTestingUtils.enableCheckpointing(jobGraph);
+        final JobGraph jobGraph = createJobGraphWithCheckpointing(JOB_VERTEX);
 
         final DeclarativeSlotPool slotPool = 
getSlotPoolWithFreeSlots(parallelism);
         final AtomicInteger eventCounter = new AtomicInteger();
@@ -2938,4 +3072,50 @@ public class AdaptiveSchedulerTest {
                     .containsEntry("canRestart", String.valueOf(canRestart));
         }
     }
+
+    private static JobGraph createJobGraphWithCheckpointing(final JobVertex... 
jobVertex) {
+        final JobGraph jobGraph =
+                JobGraphBuilder.newStreamingJobGraphBuilder()
+                        .addJobVertices(Arrays.asList(jobVertex))
+                        .build();
+        SchedulerTestingUtils.enableCheckpointing(
+                jobGraph, null, null, Duration.ofHours(1).toMillis(), true);
+        return jobGraph;
+    }
+
+    private static AdaptiveScheduler.StateTransitionManagerFactory
+            createAutoAdvanceStateTransitionManagerFactory() {
+        return (context,
+                ignoredClock,
+                ignoredCooldown,
+                ignoredResourceStabilizationTimeout,
+                ignoredMaxTriggerDelay) ->
+                TestingStateTransitionManager.withOnTriggerEventOnly(
+                        () -> {
+                            if (context instanceof WaitingForResources) {
+                                context.transitionToSubsequentState();
+                            }
+                        });
+    }
+
+    private static Map<OperatorID, OperatorSubtaskState>
+            generateFakeKeyedManagedStateForAllOperators(final JobGraph 
jobGraph)
+                    throws IOException {
+        final Map<OperatorID, OperatorSubtaskState> operatorStates = new 
HashMap<>();
+        for (final JobVertex jobVertex : jobGraph.getVertices()) {
+            final KeyedStateHandle keyedStateHandle =
+                    generateKeyGroupState(
+                            jobVertex.getID(),
+                            KeyGroupRange.of(0, 
jobGraph.getMaximumParallelism() - 1),
+                            false);
+            for (OperatorIDPair operatorId : jobVertex.getOperatorIDs()) {
+                operatorStates.put(
+                        operatorId.getGeneratedOperatorID(),
+                        OperatorSubtaskState.builder()
+                                .setManagedKeyedState(keyedStateHandle)
+                                .build());
+            }
+        }
+        return operatorStates;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index 509ea292018..862722407de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -1074,9 +1074,16 @@ class ExecutingTest {
         MockExecutionJobVertex(
                 Function<ExecutionJobVertex, ExecutionVertex> 
executionVertexSupplier)
                 throws JobException {
+            this(executionVertexSupplier, new JobVertex("test"));
+        }
+
+        MockExecutionJobVertex(
+                final Function<ExecutionJobVertex, ExecutionVertex> 
executionVertexSupplier,
+                final JobVertex jobVertex)
+                throws JobException {
             super(
                     new MockInternalExecutionGraphAccessor(),
-                    new JobVertex("test"),
+                    jobVertex,
                     new DefaultVertexParallelismInfo(1, 1, max -> 
Optional.empty()),
                     new CoordinatorStoreImpl(),
                     
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
index 445eedc82ea..9be8dc41ec7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation.VertexInformation;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 
@@ -32,6 +33,7 @@ import org.junit.jupiter.api.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -39,6 +41,7 @@ import java.util.stream.IntStream;
 
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.hasSize;
 
@@ -74,6 +77,56 @@ class StateLocalitySlotAssignerTest {
         verifyAssignments(assignments, newParallelism, allocationWith200bytes);
     }
 
+    @Test
+    // In case of local recovery, we want to preserve slot allocations even if 
there is no
+    // keyed managed state available.
+    public void testSlotsPreservationWithNoStateSameParallelism() {
+        final int parallelism = 2;
+        final VertexInformation vertex = createVertex(parallelism);
+        final AllocationID allocationID1 = new AllocationID();
+        final AllocationID allocationID2 = new AllocationID();
+
+        final List<VertexAllocationInformation> previousAllocations =
+                Arrays.asList(
+                        new VertexAllocationInformation(
+                                allocationID1, vertex.getJobVertexID(), 
KeyGroupRange.of(0, 63), 0),
+                        new VertexAllocationInformation(
+                                allocationID2,
+                                vertex.getJobVertexID(),
+                                KeyGroupRange.of(64, 127),
+                                0));
+
+        final Collection<SlotAssignment> assignments =
+                assign(
+                        vertex,
+                        // Providing allocation IDs in reverse order to check 
that assigner fixes
+                        // the order based on previous allocations.
+                        Arrays.asList(allocationID2, allocationID1),
+                        previousAllocations);
+
+        // Extract allocation IDs from assignments sorted by subtask index.
+        final List<AllocationID> subtaskOrderedNewAllocations =
+                assignments.stream()
+                        .sorted(
+                                Comparator.comparingInt(
+                                        assignment ->
+                                                assignment
+                                                        .getTargetAs(
+                                                                
SlotSharingSlotAllocator
+                                                                        
.ExecutionSlotSharingGroup
+                                                                        .class)
+                                                        
.getContainedExecutionVertices()
+                                                        .stream()
+                                                        .mapToInt(
+                                                                
ExecutionVertexID::getSubtaskIndex)
+                                                        .findAny()
+                                                        .orElseThrow()))
+                        .map(assignment -> 
assignment.getSlotInfo().getAllocationId())
+                        .collect(Collectors.toList());
+
+        
assertThat(subtaskOrderedNewAllocations).containsExactly(allocationID1, 
allocationID2);
+    }
+
     @Test
     void testSlotsAreNotWasted() {
         VertexInformation vertex = createVertex(2);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
index 0fd638c43bf..b0897d54f53 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
@@ -21,9 +21,12 @@ package 
org.apache.flink.runtime.scheduler.adaptive.allocator;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
 import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.function.TriFunction;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 
 /** Testing implementation of {@link SlotAllocator}. */
@@ -32,14 +35,39 @@ public class TestingSlotAllocator implements SlotAllocator {
     private final Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
             calculateRequiredSlotsFunction;
 
-    private final Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction;
+    private final Function<JobSchedulingPlan, Optional<ReservedSlots>> 
tryReserveResourcesFunction;
+
+    private final BiFunction<
+                    JobInformation, Collection<? extends SlotInfo>, 
Optional<VertexParallelism>>
+            determineParallelismFunction;
+
+    private final TriFunction<
+                    JobInformation,
+                    Collection<? extends SlotInfo>,
+                    JobAllocationsInformation,
+                    Optional<JobSchedulingPlan>>
+            determineParallelismAndCalculateAssignmentFunction;
 
     private TestingSlotAllocator(
             Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
                     calculateRequiredSlotsFunction,
-            Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction) {
+            Function<JobSchedulingPlan, Optional<ReservedSlots>> 
tryReserveResourcesFunction,
+            final BiFunction<
+                            JobInformation,
+                            Collection<? extends SlotInfo>,
+                            Optional<VertexParallelism>>
+                    determineParallelismFunction,
+            final TriFunction<
+                            JobInformation,
+                            Collection<? extends SlotInfo>,
+                            JobAllocationsInformation,
+                            Optional<JobSchedulingPlan>>
+                    determineParallelismAndCalculateAssignmentFunction) {
         this.calculateRequiredSlotsFunction = calculateRequiredSlotsFunction;
         this.tryReserveResourcesFunction = tryReserveResourcesFunction;
+        this.determineParallelismFunction = determineParallelismFunction;
+        this.determineParallelismAndCalculateAssignmentFunction =
+                determineParallelismAndCalculateAssignmentFunction;
     }
 
     @Override
@@ -51,7 +79,7 @@ public class TestingSlotAllocator implements SlotAllocator {
     @Override
     public Optional<VertexParallelism> determineParallelism(
             JobInformation jobInformation, Collection<? extends SlotInfo> 
slots) {
-        return Optional.empty();
+        return determineParallelismFunction.apply(jobInformation, slots);
     }
 
     @Override
@@ -59,12 +87,13 @@ public class TestingSlotAllocator implements SlotAllocator {
             JobInformation jobInformation,
             Collection<? extends SlotInfo> slots,
             JobAllocationsInformation jobAllocationsInformation) {
-        return Optional.empty();
+        return determineParallelismAndCalculateAssignmentFunction.apply(
+                jobInformation, slots, jobAllocationsInformation);
     }
 
     @Override
     public Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan 
jobSchedulingPlan) {
-        return 
tryReserveResourcesFunction.apply(jobSchedulingPlan.getVertexParallelism());
+        return tryReserveResourcesFunction.apply(jobSchedulingPlan);
     }
 
     public static Builder newBuilder() {
@@ -75,9 +104,21 @@ public class TestingSlotAllocator implements SlotAllocator {
     public static final class Builder {
         private Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
                 calculateRequiredSlotsFunction = ignored -> 
ResourceCounter.empty();
-        private Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction =
+        private Function<JobSchedulingPlan, Optional<ReservedSlots>> 
tryReserveResourcesFunction =
                 ignored -> Optional.empty();
 
+        private BiFunction<
+                        JobInformation, Collection<? extends SlotInfo>, 
Optional<VertexParallelism>>
+                determineSlotsFunction = (jobInformation, slots) -> 
Optional.empty();
+
+        private TriFunction<
+                        JobInformation,
+                        Collection<? extends SlotInfo>,
+                        JobAllocationsInformation,
+                        Optional<JobSchedulingPlan>>
+                determineParallelismAndCalculateAssignmentFunction =
+                        (jobInformation, slots, jobAllocationsInformation) -> 
Optional.empty();
+
         public Builder setCalculateRequiredSlotsFunction(
                 Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
                         calculateRequiredSlotsFunction) {
@@ -85,15 +126,56 @@ public class TestingSlotAllocator implements SlotAllocator 
{
             return this;
         }
 
+        public Builder setDetermineParallelismFunction(
+                BiFunction<
+                                JobInformation,
+                                Collection<? extends SlotInfo>,
+                                Optional<VertexParallelism>>
+                        determineParallelismFunction) {
+            this.determineSlotsFunction = determineParallelismFunction;
+            return this;
+        }
+
+        public Builder setDetermineParallelismAndCalculateAssignmentFunction(
+                TriFunction<
+                                JobInformation,
+                                Collection<? extends SlotInfo>,
+                                JobAllocationsInformation,
+                                Optional<JobSchedulingPlan>>
+                        determineParallelismAndCalculateAssignmentFunction) {
+            this.determineParallelismAndCalculateAssignmentFunction =
+                    determineParallelismAndCalculateAssignmentFunction;
+            return this;
+        }
+
         public Builder setTryReserveResourcesFunction(
-                Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction) {
+                Function<JobSchedulingPlan, Optional<ReservedSlots>> 
tryReserveResourcesFunction) {
             this.tryReserveResourcesFunction = tryReserveResourcesFunction;
             return this;
         }
 
         public TestingSlotAllocator build() {
             return new TestingSlotAllocator(
-                    calculateRequiredSlotsFunction, 
tryReserveResourcesFunction);
+                    calculateRequiredSlotsFunction,
+                    tryReserveResourcesFunction,
+                    determineSlotsFunction,
+                    determineParallelismAndCalculateAssignmentFunction);
         }
     }
+
+    public static TestingSlotAllocator 
getArgumentCapturingDelegatingSlotAllocator(
+            final SlotAllocator slotAllocator,
+            final List<JobAllocationsInformation> capturedAllocations) {
+        return TestingSlotAllocator.newBuilder()
+                
.setCalculateRequiredSlotsFunction(slotAllocator::calculateRequiredSlots)
+                
.setTryReserveResourcesFunction(slotAllocator::tryReserveResources)
+                
.setDetermineParallelismFunction(slotAllocator::determineParallelism)
+                .setDetermineParallelismAndCalculateAssignmentFunction(
+                        (jobInformation, slotInfos, jobAllocationsInformation) 
-> {
+                            capturedAllocations.add(jobAllocationsInformation);
+                            return 
slotAllocator.determineParallelismAndCalculateAssignment(
+                                    jobInformation, slotInfos, 
jobAllocationsInformation);
+                        })
+                .build();
+    }
 }
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index 03b13fbc676..48823bf62d5 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -61,7 +61,7 @@ This file is based on the checkstyle file of Apache Beam.
        -->
 
        <module name="FileLength">
-               <property name="max" value="3100"/>
+               <property name="max" value="3150"/>
        </module>
 
        <!-- All Java AST specific tests live under TreeWalker module. -->

Reply via email to