This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e42cdd1efe3 [FLINK-38338][runtime] Introduce the abstraction to 
describe a rescale event. (#26981)
e42cdd1efe3 is described below

commit e42cdd1efe354d406c8791f319292d3b185d9281
Author: Yuepeng Pan <[email protected]>
AuthorDate: Mon Dec 1 10:22:05 2025 +0800

    [FLINK-38338][runtime] Introduce the abstraction to describe a rescale 
event. (#26981)
---
 .../jobmanager/scheduler/SlotSharingGroup.java     |  37 +-
 .../flink/runtime/scheduler/adaptive/Finished.java |   9 +
 .../scheduler/adaptive/JobGraphJobInformation.java |  19 +-
 .../flink/runtime/scheduler/adaptive/State.java    |  15 +-
 .../adaptive/StateWithExecutionGraph.java          |  11 +
 .../adaptive/StateWithoutExecutionGraph.java       |   9 +
 .../adaptive/allocator/AllocatorUtil.java          |   6 +-
 .../adaptive/allocator/JobInformation.java         |   7 +
 .../allocator/SlotSharingSlotAllocator.java        |  38 +-
 .../scheduler/adaptive/timeline/Durable.java       |  51 +++
 .../scheduler/adaptive/timeline/Rescale.java       | 458 +++++++++++++++++++++
 .../scheduler/adaptive/timeline/RescaleIdInfo.java |  81 ++++
 .../adaptive/timeline/SchedulerStateSpan.java      | 123 ++++++
 .../adaptive/timeline/SlotSharingGroupRescale.java | 159 +++++++
 .../scheduler/adaptive/timeline/TerminalState.java |  47 +++
 .../adaptive/timeline/TerminatedReason.java        |  59 +++
 .../scheduler/adaptive/timeline/TriggerCause.java  |  37 ++
 .../timeline/VertexParallelismRescale.java         | 160 +++++++
 .../api/graph/StreamingJobGraphGenerator.java      |   3 +
 .../runtime/scheduler/adaptive/ExecutingTest.java  |   6 +
 .../adaptive/allocator/TestVertexInformation.java  |   5 +
 .../scheduler/adaptive/timeline/RescaleTest.java   | 414 +++++++++++++++++++
 .../timeline/TestingAdaptiveSchedulerState.java    |  75 ++++
 .../adaptive/timeline/TestingJobInformation.java   |  87 ++++
 24 files changed, 1888 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index c82af11235e..81327551d84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.util.Collections;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -41,6 +42,8 @@ public class SlotSharingGroup implements java.io.Serializable 
{
 
     private final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
 
+    private String slotSharingGroupName;
+
     // Represents resources of all tasks in the group. Default to be UNKNOWN.
     private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
 
@@ -70,12 +73,44 @@ public class SlotSharingGroup implements 
java.io.Serializable {
         return resourceProfile;
     }
 
+    public String getSlotSharingGroupName() {
+        return slotSharingGroupName;
+    }
+
+    public void setSlotSharingGroupName(String slotSharingGroupName) {
+        this.slotSharingGroupName = slotSharingGroupName;
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
 
+    @Override
+    public int hashCode() {
+        return Objects.hash(slotSharingGroupId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SlotSharingGroup that = (SlotSharingGroup) o;
+        return Objects.equals(slotSharingGroupId, that.slotSharingGroupId);
+    }
+
     @Override
     public String toString() {
-        return "SlotSharingGroup{" + "ids=" + ids + ", resourceProfile=" + 
resourceProfile + '}';
+        return "SlotSharingGroup{"
+                + "ids="
+                + ids
+                + ", slotSharingGroupId="
+                + slotSharingGroupId
+                + ", slotSharingGroupName='"
+                + slotSharingGroupName
+                + '\''
+                + ", resourceProfile="
+                + resourceProfile
+                + '}';
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
index 87f4d1f0288..d7abdab8f93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
 
 import org.slf4j.Logger;
 
@@ -34,13 +35,21 @@ class Finished implements State {
 
     private final Logger logger;
 
+    private final Durable durable;
+
     Finished(Context context, ArchivedExecutionGraph archivedExecutionGraph, 
Logger logger) {
         this.archivedExecutionGraph = archivedExecutionGraph;
         this.logger = logger;
+        this.durable = new Durable();
 
         context.onFinished(archivedExecutionGraph);
     }
 
+    @Override
+    public Durable getDurable() {
+        return durable;
+    }
+
     @Override
     public void cancel() {}
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
index 27fef933758..372cc67d15d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -41,7 +42,7 @@ public class JobGraphJobInformation implements JobInformation 
{
     private final JobGraph jobGraph;
     private final JobID jobID;
     private final String name;
-    private final VertexParallelismStore vertexParallelismStore;
+    protected final VertexParallelismStore vertexParallelismStore;
 
     public JobGraphJobInformation(
             JobGraph jobGraph, VertexParallelismStore vertexParallelismStore) {
@@ -91,17 +92,19 @@ public class JobGraphJobInformation implements 
JobInformation {
         return InstantiationUtil.cloneUnchecked(jobGraph);
     }
 
+    @Override
     public VertexParallelismStore getVertexParallelismStore() {
         return vertexParallelismStore;
     }
 
-    private static final class JobVertexInformation implements 
JobInformation.VertexInformation {
+    @VisibleForTesting
+    public static final class JobVertexInformation implements 
JobInformation.VertexInformation {
 
         private final JobVertex jobVertex;
 
         private final VertexParallelismInformation parallelismInfo;
 
-        private JobVertexInformation(
+        public JobVertexInformation(
                 JobVertex jobVertex, VertexParallelismInformation 
parallelismInfo) {
             this.jobVertex = jobVertex;
             this.parallelismInfo = parallelismInfo;
@@ -112,6 +115,11 @@ public class JobGraphJobInformation implements 
JobInformation {
             return jobVertex.getID();
         }
 
+        @Override
+        public String getVertexName() {
+            return jobVertex.getName();
+        }
+
         @Override
         public int getMinParallelism() {
             return parallelismInfo.getMinParallelism();
@@ -137,5 +145,10 @@ public class JobGraphJobInformation implements 
JobInformation {
         public CoLocationGroup getCoLocationGroup() {
             return jobVertex.getCoLocationGroup();
         }
+
+        @VisibleForTesting
+        public VertexParallelismInformation getVertexParallelismInfo() {
+            return parallelismInfo;
+        }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
index 1185274fbd7..4c3d2ce721d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.scheduler.adaptive;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.slf4j.Logger;
 
+import java.time.Instant;
 import java.util.Optional;
 import java.util.function.Consumer;
 
@@ -33,14 +35,23 @@ import java.util.function.Consumer;
  * State abstraction of the {@link AdaptiveScheduler}. This interface contains 
all methods every
  * state implementation must support.
  */
-interface State extends LabeledGlobalFailureHandler {
+public interface State extends LabeledGlobalFailureHandler {
+
+    /**
+     * Get the durable time information of the current state.
+     *
+     * @return The durable time information of the current state.
+     */
+    Durable getDurable();
 
     /**
      * This method is called whenever one transitions out of this state.
      *
      * @param newState newState is the state into which the scheduler 
transitions
      */
-    default void onLeave(Class<? extends State> newState) {}
+    default void onLeave(Class<? extends State> newState) {
+        getDurable().setLeaveTimestamp(Instant.now().toEpochMilli());
+    }
 
     /** Cancels the job execution. */
     void cancel();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 8be4a99f0fd..e5cc1e7cdd8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -55,6 +55,7 @@ import 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.KvStateHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.VertexEndOfDataListener;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
@@ -69,6 +70,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -101,6 +103,8 @@ abstract class StateWithExecutionGraph implements State {
 
     private final VertexEndOfDataListener vertexEndOfDataListener;
 
+    private final Durable durable;
+
     StateWithExecutionGraph(
             Context context,
             ExecutionGraph executionGraph,
@@ -118,6 +122,7 @@ abstract class StateWithExecutionGraph implements State {
         this.userCodeClassLoader = userClassCodeLoader;
         this.failureCollection = new ArrayList<>(failureCollection);
         this.vertexEndOfDataListener = new 
VertexEndOfDataListener(executionGraph);
+        this.durable = new Durable();
 
         FutureUtils.assertNoException(
                 executionGraph
@@ -137,6 +142,11 @@ abstract class StateWithExecutionGraph implements State {
                                 context.getMainThreadExecutor()));
     }
 
+    @Override
+    public Durable getDurable() {
+        return durable;
+    }
+
     ExecutionGraph getExecutionGraph() {
         return executionGraph;
     }
@@ -156,6 +166,7 @@ abstract class StateWithExecutionGraph implements State {
 
     @Override
     public void onLeave(Class<? extends State> newState) {
+        getDurable().setLeaveTimestamp(Instant.now().toEpochMilli());
         if (!StateWithExecutionGraph.class.isAssignableFrom(newState)) {
             // we are leaving the StateWithExecutionGraph --> we need to 
dispose temporary services
             operatorCoordinatorHandler.disposeAllOperatorCoordinators();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
index 43425445a4c..bb3b8381d77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
 
 import org.slf4j.Logger;
 
@@ -39,9 +40,17 @@ abstract class StateWithoutExecutionGraph implements State {
 
     private final Logger logger;
 
+    private final Durable durable;
+
     StateWithoutExecutionGraph(Context context, Logger logger) {
         this.context = context;
         this.logger = logger;
+        this.durable = new Durable();
+    }
+
+    @Override
+    public Durable getDurable() {
+        return durable;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
index 8fbe44c3a1d..1d85dbfa663 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 
@@ -35,13 +35,13 @@ class AllocatorUtil {
 
     private AllocatorUtil() {}
 
-    static Map<SlotSharingGroupId, 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+    static Map<SlotSharingGroup, 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
             getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
         return 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
     }
 
     static int getMinimumRequiredSlots(
-            Map<SlotSharingGroupId, 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+            Map<SlotSharingGroup, 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
                     slotSharingGroupMetaInfos) {
         return slotSharingGroupMetaInfos.values().stream()
                 
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
index e5be2502b72..247de28ec5f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 
 import javax.annotation.Nullable;
 
@@ -51,10 +52,16 @@ public interface JobInformation {
 
     Iterable<VertexInformation> getVertices();
 
+    default VertexParallelismStore getVertexParallelismStore() {
+        throw new UnsupportedOperationException();
+    }
+
     /** Information about a single vertex. */
     interface VertexInformation {
         JobVertexID getJobVertexID();
 
+        String getVertexName();
+
         int getMinParallelism();
 
         int getParallelism();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 9ac1ecfbedd..39bfa7902a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -17,11 +17,9 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -119,7 +117,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     public Optional<VertexParallelism> determineParallelism(
             JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
 
-        final Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> 
slotSharingGroupMetaInfo =
+        final Map<SlotSharingGroup, SlotSharingGroupMetaInfo> 
slotSharingGroupMetaInfo =
                 getSlotSharingGroupMetaInfos(jobInformation);
 
         final int minimumRequiredSlots = 
getMinimumRequiredSlots(slotSharingGroupMetaInfo);
@@ -128,7 +126,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             return Optional.empty();
         }
 
-        final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism =
+        final Map<SlotSharingGroup, Integer> slotSharingGroupParallelism =
                 determineSlotsPerSharingGroup(
                         jobInformation,
                         freeSlots.size(),
@@ -146,8 +144,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             final Map<JobVertexID, Integer> vertexParallelism =
                     determineVertexParallelism(
                             containedJobVertices,
-                            slotSharingGroupParallelism.get(
-                                    slotSharingGroup.getSlotSharingGroupId()));
+                            slotSharingGroupParallelism.get(slotSharingGroup));
             allVertexParallelism.putAll(vertexParallelism);
         }
         return Optional.of(new VertexParallelism(allVertexParallelism));
@@ -219,19 +216,19 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
      * evenly as possible. If a group requires less than an even share of 
slots the remainder is
      * distributed over the remaining groups.
      */
-    private static Map<SlotSharingGroupId, Integer> 
determineSlotsPerSharingGroup(
+    private static Map<SlotSharingGroup, Integer> 
determineSlotsPerSharingGroup(
             JobInformation jobInformation,
             int freeSlots,
             int minRequiredSlots,
-            Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> 
slotSharingGroupMetaInfo) {
+            Map<SlotSharingGroup, SlotSharingGroupMetaInfo> 
slotSharingGroupMetaInfo) {
 
         int numUnassignedSlots = freeSlots;
         int numUnassignedSlotSharingGroups = 
jobInformation.getSlotSharingGroups().size();
         int numMinSlotsRequiredByRemainingGroups = minRequiredSlots;
 
-        final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism = 
new HashMap<>();
+        final Map<SlotSharingGroup, Integer> slotSharingGroupParallelism = new 
HashMap<>();
 
-        for (SlotSharingGroupId slotSharingGroup :
+        for (SlotSharingGroup slotSharingGroup :
                 
sortSlotSharingGroupsByHighestParallelismRange(slotSharingGroupMetaInfo)) {
             final int minParallelism =
                     
slotSharingGroupMetaInfo.get(slotSharingGroup).getMaxLowerBound();
@@ -264,8 +261,8 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         return slotSharingGroupParallelism;
     }
 
-    private static List<SlotSharingGroupId> 
sortSlotSharingGroupsByHighestParallelismRange(
-            Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> 
slotSharingGroupMetaInfo) {
+    private static List<SlotSharingGroup> 
sortSlotSharingGroupsByHighestParallelismRange(
+            Map<SlotSharingGroup, SlotSharingGroupMetaInfo> 
slotSharingGroupMetaInfo) {
 
         return slotSharingGroupMetaInfo.entrySet().stream()
                 .sorted(
@@ -367,7 +364,6 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             this.containedExecutionVertices = containedExecutionVertices;
         }
 
-        @VisibleForTesting
         public SlotSharingGroup getSlotSharingGroup() {
             return slotSharingGroup;
         }
@@ -376,6 +372,10 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             return id;
         }
 
+        public ResourceProfile getResourceProfile() {
+            return slotSharingGroup.getResourceProfile();
+        }
+
         public Collection<ExecutionVertexID> getContainedExecutionVertices() {
             return containedExecutionVertices;
         }
@@ -387,7 +387,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         }
     }
 
-    static class SlotSharingGroupMetaInfo {
+    public static class SlotSharingGroupMetaInfo {
 
         private final int minLowerBound;
         private final int maxLowerBound;
@@ -415,7 +415,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             return maxUpperBound - maxLowerBound;
         }
 
-        public static Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> from(
+        public static Map<SlotSharingGroup, SlotSharingGroupMetaInfo> from(
                 Iterable<JobInformation.VertexInformation> vertices) {
 
             return getPerSlotSharingGroups(
@@ -438,15 +438,15 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                                             metaInfo2.getMaxUpperBound())));
         }
 
-        private static <T> Map<SlotSharingGroupId, T> getPerSlotSharingGroups(
+        static <T> Map<SlotSharingGroup, T> getPerSlotSharingGroups(
                 Iterable<JobInformation.VertexInformation> vertices,
                 Function<JobInformation.VertexInformation, T> mapper,
                 BiFunction<T, T, T> reducer) {
-            final Map<SlotSharingGroupId, T> extractedPerSlotSharingGroups = 
new HashMap<>();
+            final Map<SlotSharingGroup, T> extractedPerSlotSharingGroups = new 
HashMap<>();
             for (JobInformation.VertexInformation vertex : vertices) {
                 extractedPerSlotSharingGroups.compute(
-                        vertex.getSlotSharingGroup().getSlotSharingGroupId(),
-                        (slotSharingGroupId, currentData) ->
+                        vertex.getSlotSharingGroup(),
+                        (ignored, currentData) ->
                                 currentData == null
                                         ? mapper.apply(vertex)
                                         : reducer.apply(currentData, 
mapper.apply(vertex)));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Durable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Durable.java
new file mode 100644
index 00000000000..b528326ae44
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Durable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+
+/** Durable to record enter timestamp and leave timestamp. */
+public class Durable {
+
+    private final Long enterTimestamp;
+    private @Nullable Long leaveTimestamp;
+
+    public Durable() {
+        this(Instant.now().toEpochMilli(), null);
+    }
+
+    public Durable(Long enterTimestamp, @Nullable Long leaveTimestamp) {
+        this.enterTimestamp = enterTimestamp;
+        this.leaveTimestamp = leaveTimestamp;
+    }
+
+    public void setLeaveTimestamp(long timestamp) {
+        this.leaveTimestamp = timestamp;
+    }
+
+    public Long getEnterTimestamp() {
+        return enterTimestamp;
+    }
+
+    public @Nullable Long getLeaveTimestamp() {
+        return leaveTimestamp;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
new file mode 100644
index 00000000000..2fc3ef21b48
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import org.apache.flink.runtime.scheduler.adaptive.State;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.SlotSharingGroupMetaInfo;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The rescale to record the related vertices and slots change during the 
rescaling process.
+ *
+ * <p>This rescale begins when the scheduler initiates a rescaling operation 
and ends when the
+ * rescaling succeeds.
+ *
+ * <pre>
+ *
+ * The structure of the rescale as follows:
+ *
+ * +--> rescale id information:
+ * +    +-->rescale uuid
+ * +    +-->resource requirements id
+ * +    +-->rescale attempt id
+ * +--> vertices:
+ * +    +--> job vertex id-1 -> vertex-1 parallelism rescale:
+ * +    +                       +--> vertex id
+ * +    +                       +--> vertex name
+ * +    +                       +--> slot sharing group id
+ * +    +                       +--> slot sharing group name
+ * +    +                       +--> desired parallelism
+ * +    +                       +--> sufficient parallelism
+ * +    +                       +--> pre-rescale parallelism
+ * +    +                       +--> post-rescale parallelism
+ * +    +--> job vertex id-2 -> vertex-2 parallelism rescale:
+ * +    +                       +--> ...
+ * +    +                       ...
+ * +    ...
+ * +--> slots:
+ * +    +--> slot sharing group id-1 -> slot-1 sharing group rescale:
+ * +    +                               +--> slot sharing group id
+ * +    +                               +--> slot sharing group name
+ * +    +                               +--> required resource profile
+ * +    +                               +--> minimal required slots
+ * +    +                               +--> pre-rescale slots
+ * +    +                               +--> post-rescale slots
+ * +    +                               +--> acquired resource profile
+ * +    +--> slot sharing group id-2 -> slot-2 sharing group rescale:
+ * +    +                               +--> ...
+ * +    +                               ...
+ * +    ...
+ * +--> scheduler states:
+ * +    +--> scheduler state span:
+ * +    +    +--> state
+ * +    +    +--> enter timestamp
+ * +    +    +--> leave timestamp
+ * +    +    +--> duration
+ * +    +    +--> exception information
+ * +    +--> ...
+ * +    ...
+ * +--> start timestamp
+ * +--> end timestamp
+ * +--> trigger cause
+ * +--> terminal state
+ * +--> terminated reason
+ *
+ * </pre>
+ *
+ * <p>The more design details about the rescale could be viewed in <a
+ * href="https://cwiki.apache.org/confluence/x/TQr0Ew";>FLIP-495</a>.
+ */
+public class Rescale implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final Logger LOG = LoggerFactory.getLogger(Rescale.class);
+
+    @Nullable private transient String stringifiedException;
+
+    private final RescaleIdInfo rescaleIdInfo;
+
+    private final Map<JobVertexID, VertexParallelismRescale> vertices;
+    private final Map<SlotSharingGroupId, SlotSharingGroupRescale> slots;
+
+    private final List<SchedulerStateSpan> schedulerStates;
+
+    private Long startTimestamp;
+    private Long endTimestamp;
+
+    private TriggerCause triggerCause;
+    @Nullable private TerminalState terminalState;
+    @Nullable private TerminatedReason terminatedReason;
+
+    Rescale(RescaleIdInfo rescaleIdInfo) {
+        this.rescaleIdInfo = rescaleIdInfo;
+        this.vertices = new HashMap<>();
+        this.slots = new HashMap<>();
+        this.schedulerStates = new ArrayList<>();
+    }
+
+    @VisibleForTesting
+    Rescale addSchedulerState(SchedulerStateSpan schedulerStateSpan) {
+        if (this.isTerminated()) {
+            LOG.warn(
+                    "Rescale is already terminated. The scheduler state {} 
will be ignored.",
+                    schedulerStateSpan);
+            return this;
+        }
+        String exceptionStr =
+                Objects.isNull(schedulerStateSpan.getStringifiedException())
+                        ? stringifiedException
+                        : schedulerStateSpan.getStringifiedException();
+        if (stringifiedException != null) {
+            stringifiedException = null;
+        }
+
+        schedulerStateSpan.setStringifiedException(exceptionStr);
+        this.schedulerStates.add(schedulerStateSpan);
+        return this;
+    }
+
+    public Rescale addSchedulerState(State state) {
+        return addSchedulerState(state, null);
+    }
+
+    public Rescale addSchedulerState(State schedulerState, @Nullable Throwable 
throwable) {
+        Long enterTimestamp = schedulerState.getDurable().getEnterTimestamp();
+        Long leaveTimestamp = schedulerState.getDurable().getLeaveTimestamp();
+
+        long logicLeaveTimestamp =
+                Objects.isNull(leaveTimestamp) ? Instant.now().toEpochMilli() 
: leaveTimestamp;
+        return addSchedulerState(
+                new SchedulerStateSpan(
+                        schedulerState.getClass().getSimpleName(),
+                        enterTimestamp,
+                        logicLeaveTimestamp,
+                        logicLeaveTimestamp - enterTimestamp,
+                        ExceptionUtils.stringifyException(throwable)));
+    }
+
+    public List<SchedulerStateSpan> getSchedulerStates() {
+        return Collections.unmodifiableList(schedulerStates);
+    }
+
+    public Rescale clearSchedulerStates() {
+        this.schedulerStates.clear();
+        return this;
+    }
+
+    @Nullable
+    public TerminalState getTerminalState() {
+        return terminalState;
+    }
+
+    @Nullable
+    public String getStringifiedException() {
+        return stringifiedException;
+    }
+
+    private boolean isTerminated() {
+        return terminalState != null;
+    }
+
+    public Duration getDuration() {
+        if (this.isTerminated() && startTimestamp != null && endTimestamp != 
null) {
+            return Duration.ofMillis(endTimestamp - startTimestamp);
+        }
+        return Duration.ZERO;
+    }
+
+    public Rescale setTerminatedReason(TerminatedReason terminatedReason) {
+        Preconditions.checkNotNull(terminatedReason);
+        if (this.terminatedReason != null) {
+            LOG.warn("The old sealed reason was already set to '{}'", 
this.terminatedReason);
+        }
+        this.terminatedReason = terminatedReason;
+        this.terminalState = terminatedReason.getTerminalState();
+        return this;
+    }
+
+    public Rescale setStartTimestamp(long timestamp) {
+        if (this.startTimestamp != null) {
+            LOG.warn("The old startTimestamp was already set to '{}'", 
this.startTimestamp);
+        }
+        this.startTimestamp = timestamp;
+        return this;
+    }
+
+    public Long getStartTimestamp() {
+        return startTimestamp;
+    }
+
+    public Rescale setEndTimestamp(Long endTimestamp) {
+        if (this.endTimestamp != null) {
+            LOG.warn("The old endTimestamp was already set to '{}'", 
this.endTimestamp);
+        }
+        this.endTimestamp = endTimestamp;
+        return this;
+    }
+
+    public Long getEndTimestamp() {
+        return endTimestamp;
+    }
+
+    public Rescale setDesiredSlots(JobInformation jobInformation) {
+        for (SlotSharingGroup sharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            int desiredSlot =
+                    sharingGroup.getJobVertexIds().stream()
+                            .map(
+                                    jobVertexID ->
+                                            jobInformation
+                                                    
.getVertexInformation(jobVertexID)
+                                                    .getParallelism())
+                            .max(Integer::compare)
+                            .orElse(0);
+            SlotSharingGroupRescale sharingGroupRescaleInfo =
+                    slots.computeIfAbsent(
+                            sharingGroup.getSlotSharingGroupId(),
+                            ignored -> new 
SlotSharingGroupRescale(sharingGroup));
+            sharingGroupRescaleInfo.setDesiredSlots(desiredSlot);
+        }
+        return this;
+    }
+
+    public Rescale setDesiredVertexParallelism(JobInformation jobInformation) {
+        Map<JobVertexID, VertexParallelismInformation> allParallelismInfo =
+                
jobInformation.getVertexParallelismStore().getAllParallelismInfo();
+        for (Map.Entry<JobVertexID, VertexParallelismInformation> entry :
+                allParallelismInfo.entrySet()) {
+            JobVertexID jvId = entry.getKey();
+            SlotSharingGroup slotSharingGroup =
+                    
jobInformation.getVertexInformation(jvId).getSlotSharingGroup();
+            String vertexName = 
jobInformation.getVertexInformation(jvId).getVertexName();
+            VertexParallelismRescale vertexParallelismRescale =
+                    this.vertices.computeIfAbsent(
+                            jvId,
+                            jobVertexID ->
+                                    new VertexParallelismRescale(
+                                            jvId, vertexName, 
slotSharingGroup));
+            vertexParallelismRescale.setRequiredParallelisms(entry.getValue());
+        }
+        return this;
+    }
+
+    public Rescale setMinimalRequiredSlots(JobInformation jobInformation) {
+        final Map<SlotSharingGroup, SlotSharingGroupMetaInfo> 
slotSharingGroupMetaInfo =
+                SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+        for (Map.Entry<SlotSharingGroup, SlotSharingGroupMetaInfo> entry :
+                slotSharingGroupMetaInfo.entrySet()) {
+            SlotSharingGroup sharingGroup = entry.getKey();
+            SlotSharingGroupRescale slotSharingGroupRescale =
+                    slots.computeIfAbsent(
+                            sharingGroup.getSlotSharingGroupId(),
+                            ignored -> new 
SlotSharingGroupRescale(sharingGroup));
+            
slotSharingGroupRescale.setMinimalRequiredSlots(entry.getValue().getMaxLowerBound());
+        }
+        return this;
+    }
+
+    public Rescale setPreRescaleSlotsAndParallelisms(
+            JobInformation jobInformation, @Nullable Rescale 
lastCompletedRescale) {
+        if (lastCompletedRescale == null) {
+            LOG.info("No available previous parallelism to set.");
+            return this;
+        }
+        for (JobVertexID jobVertexID : 
lastCompletedRescale.getVertices().keySet()) {
+            Integer preRescaleParallelism =
+                    
lastCompletedRescale.vertices.get(jobVertexID).getPostRescaleParallelism();
+            JobInformation.VertexInformation vertexInformation =
+                    jobInformation.getVertexInformation(jobVertexID);
+            VertexParallelismRescale vertexParallelismRescale =
+                    vertices.computeIfAbsent(
+                            jobVertexID,
+                            jobVertexId ->
+                                    new VertexParallelismRescale(
+                                            jobVertexId,
+                                            vertexInformation.getVertexName(),
+                                            
vertexInformation.getSlotSharingGroup()));
+            
vertexParallelismRescale.setPreRescaleParallelism(preRescaleParallelism);
+        }
+
+        Map<SlotSharingGroupId, SlotSharingGroupRescale> slotsRescales =
+                lastCompletedRescale.getSlots();
+        for (SlotSharingGroup sharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            SlotSharingGroupId slotSharingGroupId = 
sharingGroup.getSlotSharingGroupId();
+            Integer preRescaleSlot = 
slotsRescales.get(slotSharingGroupId).getPostRescaleSlots();
+            SlotSharingGroupRescale slotSharingGroupRescale =
+                    slots.computeIfAbsent(
+                            slotSharingGroupId,
+                            ignored -> new 
SlotSharingGroupRescale(sharingGroup));
+            slotSharingGroupRescale.setPreRescaleSlots(preRescaleSlot);
+        }
+
+        return this;
+    }
+
+    public Rescale setPostRescaleVertexParallelism(
+            JobInformation jobInformation, VertexParallelism 
postRescaleVertexParallelism) {
+
+        Set<JobVertexID> vertices = postRescaleVertexParallelism.getVertices();
+        for (JobVertexID vertexID : vertices) {
+            JobInformation.VertexInformation vertexInformation =
+                    jobInformation.getVertexInformation(vertexID);
+            VertexParallelismRescale vertexParallelismRescale =
+                    this.vertices.computeIfAbsent(
+                            vertexID,
+                            jobVertexId ->
+                                    new VertexParallelismRescale(
+                                            jobVertexId,
+                                            vertexInformation.getVertexName(),
+                                            
vertexInformation.getSlotSharingGroup()));
+
+            vertexParallelismRescale.setPostRescaleParallelism(
+                    postRescaleVertexParallelism.getParallelism(vertexID));
+        }
+        return this;
+    }
+
+    public Rescale setPostRescaleSlots(
+            Collection<JobSchedulingPlan.SlotAssignment> 
postRescaleSlotAssignments) {
+        Map<SlotSharingGroup, Set<JobSchedulingPlan.SlotAssignment>> 
assignmentsPerSharingGroup =
+                postRescaleSlotAssignments.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        slotAssignment ->
+                                                slotAssignment
+                                                        .getTargetAs(
+                                                                
ExecutionSlotSharingGroup.class)
+                                                        .getSlotSharingGroup(),
+                                        Collectors.toSet()));
+        for (Map.Entry<SlotSharingGroup, 
Set<JobSchedulingPlan.SlotAssignment>> entry :
+                assignmentsPerSharingGroup.entrySet()) {
+            SlotSharingGroup sharingGroup = entry.getKey();
+            Set<JobSchedulingPlan.SlotAssignment> assignments =
+                    assignmentsPerSharingGroup.get(sharingGroup);
+            int postRescaleSlot = assignments.size();
+            ResourceProfile acquiredResource =
+                    
assignments.iterator().next().getSlotInfo().getResourceProfile();
+            SlotSharingGroupRescale slotSharingGroupRescale =
+                    slots.computeIfAbsent(
+                            sharingGroup.getSlotSharingGroupId(),
+                            ignored -> new 
SlotSharingGroupRescale(sharingGroup));
+            slotSharingGroupRescale.setPostRescaleSlots(postRescaleSlot);
+            
slotSharingGroupRescale.setAcquiredResourceProfile(acquiredResource);
+        }
+        return this;
+    }
+
+    public Rescale setTriggerCause(TriggerCause triggerCause) {
+        this.triggerCause = triggerCause;
+        return this;
+    }
+
+    public TriggerCause getTriggerCause() {
+        return triggerCause;
+    }
+
+    public void log() {
+        LOG.info("Updated rescale is: {}", this);
+    }
+
+    public Map<JobVertexID, VertexParallelismRescale> getVertices() {
+        return Collections.unmodifiableMap(vertices);
+    }
+
+    public Map<SlotSharingGroupId, SlotSharingGroupRescale> getSlots() {
+        return Collections.unmodifiableMap(slots);
+    }
+
+    public static boolean isTerminated(Rescale rescale) {
+        return rescale != null && rescale.isTerminated();
+    }
+
+    public Rescale setStringifiedException(String stringifiedException) {
+        this.stringifiedException = stringifiedException;
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return "Rescale{"
+                + "stringifiedException='"
+                + stringifiedException
+                + '\''
+                + ", rescaleIdInfo="
+                + rescaleIdInfo
+                + ", vertices="
+                + vertices
+                + ", slots="
+                + slots
+                + ", schedulerStates="
+                + schedulerStates
+                + ", startTimestamp="
+                + startTimestamp
+                + ", endTimestamp="
+                + endTimestamp
+                + ", triggerCause="
+                + triggerCause
+                + ", terminalState="
+                + terminalState
+                + ", terminatedReason="
+                + terminatedReason
+                + '}';
+    }
+
+    @VisibleForTesting
+    Map<JobVertexID, VertexParallelismRescale> getModifiableVertices() {
+        return vertices;
+    }
+
+    @VisibleForTesting
+    Map<SlotSharingGroupId, SlotSharingGroupRescale> getModifiableSlots() {
+        return slots;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
new file mode 100644
index 00000000000..94d36cc1393
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.util.AbstractID;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The class to represent the rescale id description in one resource 
requirements rescale. */
+public class RescaleIdInfo implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final AbstractID rescaleUuid;
+    private final AbstractID resourceRequirementsId;
+    private final long rescaleAttemptId;
+
+    public RescaleIdInfo(AbstractID resourceRequirementsId, Long 
rescaleAttemptId) {
+        this.resourceRequirementsId = resourceRequirementsId;
+        this.rescaleAttemptId = rescaleAttemptId;
+        this.rescaleUuid = new AbstractID();
+    }
+
+    public AbstractID getRescaleUuid() {
+        return rescaleUuid;
+    }
+
+    public AbstractID getResourceRequirementsId() {
+        return resourceRequirementsId;
+    }
+
+    public long getRescaleAttemptId() {
+        return rescaleAttemptId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        RescaleIdInfo rescaleIdInfo = (RescaleIdInfo) o;
+        return rescaleAttemptId == rescaleIdInfo.rescaleAttemptId
+                && Objects.equals(rescaleUuid, rescaleIdInfo.rescaleUuid)
+                && Objects.equals(resourceRequirementsId, 
rescaleIdInfo.resourceRequirementsId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rescaleUuid, resourceRequirementsId, 
rescaleAttemptId);
+    }
+
+    @Override
+    public String toString() {
+        return "IdEpoch{"
+                + "rescaleUuid="
+                + rescaleUuid
+                + ", resourceRequirementsId="
+                + resourceRequirementsId
+                + ", rescaleAttemptId="
+                + rescaleAttemptId
+                + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
new file mode 100644
index 00000000000..1c7e1d02ec3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Utils class to record the information of a scheduler state that contains 
the span of the time of
+ * the adaptive scheduler state, enter timestamp, leave timestamp, the 
exception if occurred during
+ * the adaptive scheduler state.
+ */
+public class SchedulerStateSpan implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String state;
+
+    @Nullable private final Long enterTimestamp;
+
+    @Nullable private final Long leaveTimestamp;
+
+    @Nullable private final Long duration;
+
+    @Nullable private String stringifiedException;
+
+    public SchedulerStateSpan(
+            String state,
+            Long logicEnterMillis,
+            Long logicLeaveMillis,
+            Long duration,
+            String stringifiedException) {
+        this.state = Preconditions.checkNotNull(state);
+        this.enterTimestamp = logicEnterMillis;
+        this.leaveTimestamp = logicLeaveMillis;
+        this.duration = duration;
+        this.stringifiedException = stringifiedException;
+    }
+
+    @Nullable
+    public Long getLeaveTimestamp() {
+        return leaveTimestamp;
+    }
+
+    public void setStringifiedException(@Nullable String stringifiedException) 
{
+        this.stringifiedException = stringifiedException;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    @Nullable
+    public Long getEnterTimestamp() {
+        return enterTimestamp;
+    }
+
+    @Nullable
+    public Long getDuration() {
+        return duration;
+    }
+
+    @Nullable
+    public String getStringifiedException() {
+        return stringifiedException;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        SchedulerStateSpan that = (SchedulerStateSpan) o;
+        return Objects.equals(state, that.state)
+                && Objects.equals(enterTimestamp, that.enterTimestamp)
+                && Objects.equals(leaveTimestamp, that.leaveTimestamp)
+                && Objects.equals(duration, that.duration)
+                && Objects.equals(stringifiedException, 
that.stringifiedException);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(state, enterTimestamp, leaveTimestamp, duration, 
stringifiedException);
+    }
+
+    @Override
+    public String toString() {
+        return "SchedulerStateSpan{"
+                + "state='"
+                + state
+                + '\''
+                + ", enterTimestamp="
+                + enterTimestamp
+                + ", leaveTimestamp="
+                + leaveTimestamp
+                + ", duration="
+                + duration
+                + ", stringifiedException='"
+                + stringifiedException
+                + '\''
+                + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SlotSharingGroupRescale.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SlotSharingGroupRescale.java
new file mode 100644
index 00000000000..84ee73275c0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SlotSharingGroupRescale.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The matching information of a requested {@link
+ * org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup}.
+ */
+public class SlotSharingGroupRescale implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SlotSharingGroupId slotSharingGroupId;
+    private final String slotSharingGroupName;
+    private final ResourceProfile requiredResourceProfile;
+    private Integer desiredSlots;
+    private Integer minimalRequiredSlots;
+    @Nullable private Integer preRescaleSlots;
+    @Nullable private Integer postRescaleSlots;
+    @Nullable private ResourceProfile acquiredResourceProfile;
+
+    public SlotSharingGroupRescale(SlotSharingGroup slotSharingGroup) {
+        this.slotSharingGroupId = slotSharingGroup.getSlotSharingGroupId();
+        this.slotSharingGroupName = slotSharingGroup.getSlotSharingGroupName();
+        this.requiredResourceProfile = slotSharingGroup.getResourceProfile();
+    }
+
+    public SlotSharingGroupId getSlotSharingGroupId() {
+        return slotSharingGroupId;
+    }
+
+    public String getSlotSharingGroupName() {
+        return slotSharingGroupName;
+    }
+
+    public Integer getDesiredSlots() {
+        return desiredSlots;
+    }
+
+    public void setDesiredSlots(Integer desiredSlots) {
+        this.desiredSlots = desiredSlots;
+    }
+
+    public Integer getMinimalRequiredSlots() {
+        return minimalRequiredSlots;
+    }
+
+    public void setMinimalRequiredSlots(Integer minimalRequiredSlots) {
+        this.minimalRequiredSlots = minimalRequiredSlots;
+    }
+
+    @Nullable
+    public Integer getPreRescaleSlots() {
+        return preRescaleSlots;
+    }
+
+    public void setPreRescaleSlots(Integer preRescaleSlots) {
+        this.preRescaleSlots = preRescaleSlots;
+    }
+
+    @Nullable
+    public Integer getPostRescaleSlots() {
+        return postRescaleSlots;
+    }
+
+    public void setPostRescaleSlots(Integer postRescaleSlots) {
+        this.postRescaleSlots = postRescaleSlots;
+    }
+
+    public ResourceProfile getRequiredResourceProfile() {
+        return requiredResourceProfile;
+    }
+
+    @Nullable
+    public ResourceProfile getAcquiredResourceProfile() {
+        return acquiredResourceProfile;
+    }
+
+    public void setAcquiredResourceProfile(ResourceProfile 
acquiredResourceProfile) {
+        this.acquiredResourceProfile = acquiredResourceProfile;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SlotSharingGroupRescale that = (SlotSharingGroupRescale) o;
+        return Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
+                && Objects.equals(slotSharingGroupName, 
that.slotSharingGroupName)
+                && Objects.equals(requiredResourceProfile, 
that.requiredResourceProfile)
+                && Objects.equals(desiredSlots, that.desiredSlots)
+                && Objects.equals(minimalRequiredSlots, 
that.minimalRequiredSlots)
+                && Objects.equals(preRescaleSlots, that.preRescaleSlots)
+                && Objects.equals(postRescaleSlots, that.postRescaleSlots)
+                && Objects.equals(acquiredResourceProfile, 
that.acquiredResourceProfile);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                slotSharingGroupId,
+                slotSharingGroupName,
+                requiredResourceProfile,
+                desiredSlots,
+                minimalRequiredSlots,
+                preRescaleSlots,
+                postRescaleSlots,
+                acquiredResourceProfile);
+    }
+
+    @Override
+    public String toString() {
+        return "SlotSharingGroupRescale{"
+                + "slotSharingGroupId="
+                + slotSharingGroupId
+                + ", slotSharingGroupName='"
+                + slotSharingGroupName
+                + '\''
+                + ", requiredResourceProfile="
+                + requiredResourceProfile
+                + ", desiredSlots="
+                + desiredSlots
+                + ", minimalRequiredSlots="
+                + minimalRequiredSlots
+                + ", preRescaleSlots="
+                + preRescaleSlots
+                + ", postRescaleSlots="
+                + postRescaleSlots
+                + ", acquiredResourceProfile="
+                + acquiredResourceProfile
+                + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminalState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminalState.java
new file mode 100644
index 00000000000..cf9272b44b3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminalState.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+/** The enum to represent the terminal state of a rescale. */
+public enum TerminalState {
+    COMPLETED("It represents the rescale was completed successfully"),
+    FAILED(
+            "It represents the rescale was failed due to some exceptions about 
lack of condition resources."),
+    IGNORED(
+            "It represents the rescale was ignored by some new conditions that 
could trigger a new rescale.\n"
+                    + "For example,\n"
+                    + "   The scheduler has received a new resource request,\n"
+                    + "   A job restart is triggered during rescale due to any 
exception,\n"
+                    + "   Available resources or parallelism do not change 
during rescale,\n"
+                    + "   The job reaches a terminal state during rescale: 
FINISHED, FAILING, CANCELING.");
+
+    private final String description;
+
+    TerminalState(String description) {
+        this.description = description;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public static boolean isTerminated(TerminalState terminalState) {
+        return terminalState != null;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminatedReason.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminatedReason.java
new file mode 100644
index 00000000000..3014a02029c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminatedReason.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+/** The enum to represent the reason why a rescale event is terminated. */
+public enum TerminatedReason {
+    SUCCEEDED(TerminalState.COMPLETED, "The rescale was completed 
successfully."),
+    EXCEPTION_OCCURRED(
+            TerminalState.FAILED,
+            "The rescale was failed due to some exceptions about no resources 
enough, etc."),
+    RESOURCE_REQUIREMENTS_UPDATED(
+            TerminalState.IGNORED, "The rescale was ignored due to the new 
resource requirements."),
+    NO_RESOURCES_OR_PARALLELISMS_CHANGE(
+            TerminalState.IGNORED,
+            "The rescale was ignored due to no available resources change or 
parallelism change."),
+    JOB_FINISHED(TerminalState.IGNORED, "The rescale was ignored due to the 
job finished."),
+    JOB_FAILED(TerminalState.IGNORED, "The rescale was ignored due to the job 
failed."),
+    JOB_CANCELED(TerminalState.IGNORED, "The rescale was ignored due to the 
job canceled."),
+    /**
+     * The value could be deleted due to
+     * https://lists.apache.org/thread/hh7w2p6lnmbo1q6d9ngkttdyrw4lp74h. Merge 
the current
+     * non-terminated rescale and the new rescale triggered by recoverable 
failover into the current
+     * rescale.
+     */
+    JOB_FAILOVER_RESTARTING(
+            TerminalState.IGNORED, "The rescale was ignored due to the job 
failover restarting.");
+
+    private final TerminalState terminalState;
+    private final String description;
+
+    TerminatedReason(TerminalState terminalState, String description) {
+        this.terminalState = terminalState;
+        this.description = description;
+    }
+
+    public TerminalState getTerminalState() {
+        return terminalState;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TriggerCause.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TriggerCause.java
new file mode 100644
index 00000000000..584bdba88c8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TriggerCause.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+/** The cause of trigger rescaling. */
+public enum TriggerCause {
+    INITIAL_SCHEDULE("The first schedule of the job starting triggerred the 
rescale."),
+    UPDATE_REQUIREMENT("Updating job resource requirements triggerred the 
rescale."),
+    NEW_RESOURCE_AVAILABLE("That new resources were available triggerred the 
rescale."),
+    RECOVERABLE_FAILOVER("Recoverable failover triggerred the rescale.");
+
+    private final String description;
+
+    TriggerCause(String description) {
+        this.description = description;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
new file mode 100644
index 00000000000..3ea9476e79a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The rescale information of a {@link 
org.apache.flink.runtime.jobgraph.JobVertex}. */
+public class VertexParallelismRescale implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final JobVertexID jobVertexId;
+    private String jobVertexName;
+    private SlotSharingGroupId slotSharingGroupId;
+    private String slotSharingGroupName;
+    private Integer desiredParallelism;
+    private Integer sufficientParallelism;
+
+    @Nullable private Integer preRescaleParallelism;
+
+    @Nullable private Integer postRescaleParallelism;
+
+    public VertexParallelismRescale(
+            JobVertexID jobVertexId, String jobVertexName, SlotSharingGroup 
slotSharingGroup) {
+        this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+        this.jobVertexName = jobVertexName;
+        this.slotSharingGroupName = slotSharingGroup.getSlotSharingGroupName();
+        this.slotSharingGroupId = slotSharingGroup.getSlotSharingGroupId();
+    }
+
+    public JobVertexID getJobVertexId() {
+        return jobVertexId;
+    }
+
+    public String getJobVertexName() {
+        return jobVertexName;
+    }
+
+    public void setJobVertexName(String jobVertexName) {
+        this.jobVertexName = jobVertexName;
+    }
+
+    public SlotSharingGroupId getSlotSharingGroupId() {
+        return slotSharingGroupId;
+    }
+
+    public String getSlotSharingGroupName() {
+        return slotSharingGroupName;
+    }
+
+    @Nullable
+    public Integer getPreRescaleParallelism() {
+        return preRescaleParallelism;
+    }
+
+    public void setPreRescaleParallelism(@Nullable Integer 
preRescaleParallelism) {
+        this.preRescaleParallelism = preRescaleParallelism;
+    }
+
+    public Integer getDesiredParallelism() {
+        return desiredParallelism;
+    }
+
+    public Integer getSufficientParallelism() {
+        return sufficientParallelism;
+    }
+
+    public void setRequiredParallelisms(VertexParallelismInformation 
vertexParallelismInformation) {
+        this.sufficientParallelism = 
vertexParallelismInformation.getMinParallelism();
+        this.desiredParallelism = 
vertexParallelismInformation.getParallelism();
+    }
+
+    @Nullable
+    public Integer getPostRescaleParallelism() {
+        return postRescaleParallelism;
+    }
+
+    public void setPostRescaleParallelism(Integer postRescaleParallelism) {
+        this.postRescaleParallelism = postRescaleParallelism;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        VertexParallelismRescale that = (VertexParallelismRescale) o;
+        return Objects.equals(jobVertexId, that.jobVertexId)
+                && Objects.equals(jobVertexName, that.jobVertexName)
+                && Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
+                && Objects.equals(slotSharingGroupName, 
that.slotSharingGroupName)
+                && Objects.equals(preRescaleParallelism, 
that.preRescaleParallelism)
+                && Objects.equals(desiredParallelism, that.desiredParallelism)
+                && Objects.equals(sufficientParallelism, 
that.sufficientParallelism)
+                && Objects.equals(postRescaleParallelism, 
that.postRescaleParallelism);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                jobVertexId,
+                jobVertexName,
+                slotSharingGroupId,
+                slotSharingGroupName,
+                preRescaleParallelism,
+                desiredParallelism,
+                sufficientParallelism,
+                postRescaleParallelism);
+    }
+
+    @Override
+    public String toString() {
+        return "VertexParallelismRescale{"
+                + "jobVertexId="
+                + jobVertexId
+                + ", jobVertexName='"
+                + jobVertexName
+                + '\''
+                + ", slotSharingGroupId="
+                + slotSharingGroupId
+                + ", slotSharingGroupName='"
+                + slotSharingGroupName
+                + '\''
+                + ", desiredParallelism="
+                + desiredParallelism
+                + ", sufficientParallelism="
+                + sufficientParallelism
+                + ", preRescaleParallelism="
+                + preRescaleParallelism
+                + ", postRescaleParallelism="
+                + postRescaleParallelism
+                + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1aa62761d43..ae948dc3852 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1926,6 +1926,8 @@ public class StreamingJobGraphGenerator {
                 // fallback to the region slot sharing group by default
                 effectiveSlotSharingGroup =
                         
checkNotNull(vertexRegionSlotSharingGroups.get(vertex.getID()));
+                effectiveSlotSharingGroup.setSlotSharingGroupName(
+                        StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP);
             } else {
                 checkState(
                         !jobVertexBuildContext.hasHybridResultPartition(),
@@ -1941,6 +1943,7 @@ public class StreamingJobGraphGenerator {
                                             
.ifPresent(ssg::setResourceProfile);
                                     return ssg;
                                 });
+                
effectiveSlotSharingGroup.setSlotSharingGroupName(slotSharingGroupKey);
             }
 
             vertex.setSlotSharingGroup(effectiveSlotSharingGroup);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index 862722407de..9fccfb70fac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -63,6 +63,7 @@ import 
org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution;
@@ -1040,6 +1041,11 @@ class ExecutingTest {
         @Override
         public void cancel() {}
 
+        @Override
+        public Durable getDurable() {
+            return null;
+        }
+
         @Override
         public void suspend(Throwable cause) {}
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
index bc42c9422fa..8d776e5c58e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
@@ -82,6 +82,11 @@ public class TestVertexInformation implements 
JobInformation.VertexInformation {
         return jobVertexId;
     }
 
+    @Override
+    public String getVertexName() {
+        return "JobVertex-" + jobVertexId.toString();
+    }
+
     @Override
     public int getMinParallelism() {
         return minParallelism;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
new file mode 100644
index 00000000000..3d0775860b5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
+import org.apache.flink.runtime.scheduler.adaptive.JobGraphJobInformation;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import org.apache.flink.runtime.scheduler.adaptive.State;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Rescale}. */
+class RescaleTest {
+
+    private final Function<Integer, Optional<String>> ingoredFunction = 
integer -> Optional.empty();
+
+    private final SlotSharingGroup slotSharingGroupA = new SlotSharingGroup();
+    private final SlotSharingGroup slotSharingGroupB = new SlotSharingGroup();
+
+    private List<SlotSharingGroup> slotSharingGroups;
+
+    private JobVertex jobVertex1OfSlotSharingGroupA;
+    private JobVertex jobVertex2OfSlotSharingGroupA;
+    private JobVertex jobVertex1OfSlotSharingGroupB;
+    private JobVertex jobVertex2OfSlotSharingGroupB;
+
+    private List<JobVertex> jobVertices;
+
+    private JobGraphJobInformation.JobVertexInformation 
jobVertexInformation1OfSlotSharingGroupA;
+    private JobGraphJobInformation.JobVertexInformation 
jobVertexInformation2OfSlotSharingGroupA;
+    private JobGraphJobInformation.JobVertexInformation 
jobVertexInformation1OfSlotSharingGroupB;
+    private JobGraphJobInformation.JobVertexInformation 
jobVertexInformation2OfSlotSharingGroupB;
+
+    private JobInformation jobInformation;
+
+    @BeforeEach
+    void setup() {
+        this.slotSharingGroupA.setSlotSharingGroupName("ssgA");
+        this.slotSharingGroupB.setSlotSharingGroupName("ssgB");
+
+        this.slotSharingGroups =
+                new ArrayList<>() {
+                    {
+                        add(slotSharingGroupA);
+                        add(slotSharingGroupB);
+                    }
+                };
+
+        this.jobVertex1OfSlotSharingGroupA = new 
JobVertex("JobVertex1OfSlotSharingGroupA");
+        jobVertex1OfSlotSharingGroupA.setSlotSharingGroup(slotSharingGroupA);
+        this.jobVertex2OfSlotSharingGroupA = new 
JobVertex("JobVertex2OfSlotSharingGroupA");
+        jobVertex2OfSlotSharingGroupA.setSlotSharingGroup(slotSharingGroupA);
+
+        this.jobVertex1OfSlotSharingGroupB = new 
JobVertex("JobVertex1OfSlotSharingGroupB");
+        jobVertex1OfSlotSharingGroupB.setSlotSharingGroup(slotSharingGroupB);
+        this.jobVertex2OfSlotSharingGroupB = new 
JobVertex("JobVertex2OfSlotSharingGroupB");
+        jobVertex2OfSlotSharingGroupB.setSlotSharingGroup(slotSharingGroupB);
+
+        this.jobVertices =
+                new ArrayList<>() {
+                    {
+                        add(jobVertex1OfSlotSharingGroupA);
+                        add(jobVertex2OfSlotSharingGroupA);
+                        add(jobVertex1OfSlotSharingGroupB);
+                        add(jobVertex2OfSlotSharingGroupB);
+                    }
+                };
+
+        this.jobVertexInformation1OfSlotSharingGroupA =
+                new JobGraphJobInformation.JobVertexInformation(
+                        jobVertex1OfSlotSharingGroupA,
+                        new DefaultVertexParallelismInfo(1, 2, 4, 
ingoredFunction));
+        this.jobVertexInformation2OfSlotSharingGroupA =
+                new JobGraphJobInformation.JobVertexInformation(
+                        jobVertex2OfSlotSharingGroupA,
+                        new DefaultVertexParallelismInfo(2, 3, 8, 
ingoredFunction));
+        this.jobVertexInformation1OfSlotSharingGroupB =
+                new JobGraphJobInformation.JobVertexInformation(
+                        jobVertex1OfSlotSharingGroupB,
+                        new DefaultVertexParallelismInfo(3, 4, 16, 
ingoredFunction));
+        this.jobVertexInformation2OfSlotSharingGroupB =
+                new JobGraphJobInformation.JobVertexInformation(
+                        jobVertex2OfSlotSharingGroupB,
+                        new DefaultVertexParallelismInfo(4, 5, 32, 
ingoredFunction));
+
+        DefaultVertexParallelismStore defaultVertexParallelismStore =
+                new DefaultVertexParallelismStore();
+        defaultVertexParallelismStore.setParallelismInfo(
+                jobVertex1OfSlotSharingGroupA.getID(),
+                
jobVertexInformation1OfSlotSharingGroupA.getVertexParallelismInfo());
+        defaultVertexParallelismStore.setParallelismInfo(
+                jobVertex2OfSlotSharingGroupA.getID(),
+                
jobVertexInformation2OfSlotSharingGroupA.getVertexParallelismInfo());
+        defaultVertexParallelismStore.setParallelismInfo(
+                jobVertex1OfSlotSharingGroupB.getID(),
+                
jobVertexInformation1OfSlotSharingGroupB.getVertexParallelismInfo());
+        defaultVertexParallelismStore.setParallelismInfo(
+                jobVertex2OfSlotSharingGroupB.getID(),
+                
jobVertexInformation2OfSlotSharingGroupB.getVertexParallelismInfo());
+
+        this.jobInformation =
+                new TestingJobInformation(
+                        new HashSet<>() {
+                            {
+                                add(slotSharingGroupA);
+                                add(slotSharingGroupB);
+                            }
+                        },
+                        new ArrayList<>() {
+                            {
+                                add(jobVertex1OfSlotSharingGroupA);
+                                add(jobVertex2OfSlotSharingGroupA);
+                                add(jobVertex1OfSlotSharingGroupB);
+                                add(jobVertex2OfSlotSharingGroupB);
+                            }
+                        },
+                        defaultVertexParallelismStore);
+    }
+
+    @Test
+    void testAddSchedulerState() {
+        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+
+        // Test for add a state span into a terminated rescale.
+        rescale.setTerminatedReason(TerminatedReason.SUCCEEDED);
+        rescale.addSchedulerState(new SchedulerStateSpan("", null, null, null, 
null));
+        assertThat(rescale.getSchedulerStates()).isEmpty();
+
+        // Test for add a state span into a non-terminated rescale.
+        rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale.addSchedulerState(new SchedulerStateSpan("", null, null, null, 
null));
+        assertThat(rescale.getSchedulerStates()).hasSize(1);
+
+        // Test the correctness of throwable processing.
+        String stringifiedException1 =
+                ExceptionUtils.stringifyException(new 
RuntimeException("exception1"));
+        rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale.setStringifiedException(stringifiedException1);
+        rescale.addSchedulerState(new SchedulerStateSpan("", null, null, null, 
null));
+        SchedulerStateSpan schedulerStateSpan = 
rescale.getSchedulerStates().get(0);
+        
assertThat(schedulerStateSpan.getStringifiedException()).isEqualTo(stringifiedException1);
+        assertThat(rescale.getStringifiedException()).isNull();
+
+        rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale.addSchedulerState(
+                new SchedulerStateSpan("", null, null, null, 
stringifiedException1));
+        schedulerStateSpan = rescale.getSchedulerStates().get(0);
+        
assertThat(schedulerStateSpan.getStringifiedException()).isEqualTo(stringifiedException1);
+
+        // Test the correctness of the end time of span auto-fulfill.
+        State stateWithoutEndTimestamp = new TestingAdaptiveSchedulerState(2L, 
null);
+        rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale.setStringifiedException(stringifiedException1);
+        rescale.addSchedulerState(stateWithoutEndTimestamp, null);
+        schedulerStateSpan = rescale.getSchedulerStates().get(0);
+        assertThat(schedulerStateSpan.getLeaveTimestamp())
+                .isLessThanOrEqualTo(Instant.now().toEpochMilli());
+    }
+
+    @Test
+    void testSetDesiredSlots() {
+        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale.setDesiredSlots(jobInformation);
+        Map<SlotSharingGroupId, SlotSharingGroupRescale> slots = 
rescale.getSlots();
+        assertThat(slots).hasSize(2);
+        assertThat(slots.keySet())
+                .hasSameElementsAs(
+                        slotSharingGroups.stream()
+                                .map(SlotSharingGroup::getSlotSharingGroupId)
+                                .collect(Collectors.toSet()));
+        assertThat(
+                        slots.values().stream()
+                                .map(SlotSharingGroupRescale::getDesiredSlots)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(3, 5);
+        assertThat(
+                        slots.values().stream()
+                                
.map(SlotSharingGroupRescale::getRequiredResourceProfile)
+                                .collect(Collectors.toSet()))
+                .containsExactly(ResourceProfile.UNKNOWN);
+    }
+
+    @Test
+    void testSetDesiredVertexParallelism() {
+        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale.setDesiredVertexParallelism(jobInformation);
+        Map<JobVertexID, VertexParallelismRescale> vertices = 
rescale.getVertices();
+        assertThat(vertices).hasSize(4);
+        assertThat(vertices.keySet())
+                .isEqualTo(
+                        vertices.values().stream()
+                                .map(VertexParallelismRescale::getJobVertexId)
+                                .collect(Collectors.toSet()))
+                .hasSameElementsAs(
+                        
jobVertices.stream().map(JobVertex::getID).collect(Collectors.toSet()));
+
+        assertThat(
+                        vertices.values().stream()
+                                
.map(VertexParallelismRescale::getJobVertexName)
+                                .collect(Collectors.toSet()))
+                .hasSameElementsAs(
+                        
jobVertices.stream().map(JobVertex::getName).collect(Collectors.toSet()));
+
+        assertThat(
+                        vertices.values().stream()
+                                
.map(VertexParallelismRescale::getSlotSharingGroupId)
+                                .collect(Collectors.toSet()))
+                .hasSameElementsAs(
+                        jobVertices.stream()
+                                .map(jv -> 
jv.getSlotSharingGroup().getSlotSharingGroupId())
+                                .collect(Collectors.toSet()));
+
+        assertThat(
+                        vertices.values().stream()
+                                
.map(VertexParallelismRescale::getSlotSharingGroupName)
+                                .collect(Collectors.toSet()))
+                .hasSameElementsAs(
+                        jobVertices.stream()
+                                .map(jv -> 
jv.getSlotSharingGroup().getSlotSharingGroupName())
+                                .collect(Collectors.toSet()));
+
+        assertThat(
+                        vertices.values().stream()
+                                
.map(VertexParallelismRescale::getDesiredParallelism)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(2, 3, 4, 5);
+        assertThat(
+                        vertices.values().stream()
+                                
.map(VertexParallelismRescale::getSufficientParallelism)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(1, 2, 3, 4);
+    }
+
+    @Test
+    void testSetMinimalRequiredSlots() {
+        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale.setMinimalRequiredSlots(jobInformation);
+        Map<SlotSharingGroupId, SlotSharingGroupRescale> slots = 
rescale.getSlots();
+        assertThat(slots).hasSize(2);
+        assertThat(slots.keySet())
+                .containsExactlyInAnyOrder(
+                        slotSharingGroupA.getSlotSharingGroupId(),
+                        slotSharingGroupB.getSlotSharingGroupId());
+        assertThat(
+                        slots.values().stream()
+                                
.map(SlotSharingGroupRescale::getMinimalRequiredSlots)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(2, 4);
+    }
+
+    @Test
+    void testSetPreRescaleSlotsAndParallelisms() {
+        // Test for null last rescale.
+        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 2L));
+        rescale.setPreRescaleSlotsAndParallelisms(jobInformation, null);
+        assertThat(rescale.getSlots()).isEmpty();
+        assertThat(rescale.getVertices()).isEmpty();
+
+        // Test for non-null last rescale.
+        // Prepare the last completed rescale.
+        Rescale lastCompletedRescale = new Rescale(new RescaleIdInfo(new 
AbstractID(), 1L));
+        Map<JobVertexID, VertexParallelismRescale> lastRescaleVertices =
+                lastCompletedRescale.getModifiableVertices();
+        jobVertices.forEach(
+                jobVertex -> {
+                    VertexParallelismRescale vertexParallelismRescale =
+                            lastRescaleVertices.computeIfAbsent(
+                                    jobVertex.getID(),
+                                    ignored ->
+                                            new VertexParallelismRescale(
+                                                    jobVertex.getID(),
+                                                    jobInformation
+                                                            
.getVertexInformation(jobVertex.getID())
+                                                            .getVertexName(),
+                                                    jobInformation
+                                                            
.getVertexInformation(jobVertex.getID())
+                                                            
.getSlotSharingGroup()));
+                    vertexParallelismRescale.setPostRescaleParallelism(4);
+                });
+        Map<SlotSharingGroupId, SlotSharingGroupRescale> 
lastRescaleSlotSharingGroups =
+                lastCompletedRescale.getModifiableSlots();
+        slotSharingGroups.forEach(
+                group -> {
+                    SlotSharingGroupRescale slotSharingGroupRescale =
+                            lastRescaleSlotSharingGroups.computeIfAbsent(
+                                    group.getSlotSharingGroupId(),
+                                    ignored -> new 
SlotSharingGroupRescale(group));
+                    slotSharingGroupRescale.setPostRescaleSlots(4);
+                });
+
+        rescale.setPreRescaleSlotsAndParallelisms(jobInformation, 
lastCompletedRescale);
+
+        assertThat(rescale.getSlots()).hasSize(2);
+        assertThat(
+                        rescale.getSlots().values().stream()
+                                
.map(SlotSharingGroupRescale::getPreRescaleSlots)
+                                .collect(Collectors.toSet()))
+                .containsExactly(4);
+
+        assertThat(
+                        rescale.getVertices().values().stream()
+                                
.map(VertexParallelismRescale::getPreRescaleParallelism)
+                                .collect(Collectors.toSet()))
+                .containsExactly(4);
+    }
+
+    @Test
+    void testSetPostRescaleVertexParallelism() {
+        Map<JobVertexID, Integer> parallelismForVertices = new HashMap<>();
+        VertexParallelism postVertexParallelism = new 
VertexParallelism(parallelismForVertices);
+        jobVertices.forEach(
+                vertex -> {
+                    parallelismForVertices.put(vertex.getID(), 2);
+                });
+        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale.setPostRescaleVertexParallelism(jobInformation, 
postVertexParallelism);
+        assertThat(
+                        rescale.getVertices().values().stream()
+                                
.map(VertexParallelismRescale::getPostRescaleParallelism)
+                                .collect(Collectors.toSet()))
+                .containsExactly(2);
+    }
+
+    @Test
+    void testSetPostRescaleSlots() {
+        List<JobSchedulingPlan.SlotAssignment> slotAssignments = new 
ArrayList<>();
+
+        slotAssignments.add(
+                new JobSchedulingPlan.SlotAssignment(
+                        new TestingSlot(ResourceProfile.UNKNOWN),
+                        new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+                                slotSharingGroupA, null)));
+        slotAssignments.add(
+                new JobSchedulingPlan.SlotAssignment(
+                        new TestingSlot(ResourceProfile.UNKNOWN),
+                        new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+                                slotSharingGroupA, null)));
+
+        slotAssignments.add(
+                new JobSchedulingPlan.SlotAssignment(
+                        new TestingSlot(ResourceProfile.UNKNOWN),
+                        new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+                                slotSharingGroupB, null)));
+        slotAssignments.add(
+                new JobSchedulingPlan.SlotAssignment(
+                        new TestingSlot(ResourceProfile.UNKNOWN),
+                        new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+                                slotSharingGroupB, null)));
+        slotAssignments.add(
+                new JobSchedulingPlan.SlotAssignment(
+                        new TestingSlot(ResourceProfile.UNKNOWN),
+                        new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+                                slotSharingGroupB, null)));
+        slotAssignments.add(
+                new JobSchedulingPlan.SlotAssignment(
+                        new TestingSlot(ResourceProfile.UNKNOWN),
+                        new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+                                slotSharingGroupB, null)));
+
+        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale.setPostRescaleSlots(slotAssignments);
+        assertThat(
+                        rescale.getSlots().values().stream()
+                                
.map(SlotSharingGroupRescale::getPostRescaleSlots)
+                                .collect(Collectors.toSet()))
+                .containsExactly(2, 4);
+        assertThat(
+                        rescale.getSlots().values().stream()
+                                
.map(SlotSharingGroupRescale::getAcquiredResourceProfile)
+                                .collect(Collectors.toSet()))
+                .containsExactly(ResourceProfile.UNKNOWN);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingAdaptiveSchedulerState.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingAdaptiveSchedulerState.java
new file mode 100644
index 00000000000..4c35f917da1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingAdaptiveSchedulerState.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.adaptive.State;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Testing helper for adaptive scheduler state span. */
+public class TestingAdaptiveSchedulerState implements State {
+
+    private final Durable durable;
+
+    TestingAdaptiveSchedulerState(Long inTimestamp, @Nullable Long 
outTimestamp) {
+        this.durable = new Durable(inTimestamp, outTimestamp);
+    }
+
+    @Override
+    public Durable getDurable() {
+        return durable;
+    }
+
+    @Override
+    public void cancel() {}
+
+    @Override
+    public void suspend(Throwable cause) {}
+
+    @Override
+    public JobID getJobId() {
+        return null;
+    }
+
+    @Override
+    public JobStatus getJobStatus() {
+        return null;
+    }
+
+    @Override
+    public ArchivedExecutionGraph getJob() {
+        return null;
+    }
+
+    @Override
+    public Logger getLogger() {
+        return null;
+    }
+
+    @Override
+    public void handleGlobalFailure(
+            Throwable cause, CompletableFuture<Map<String, String>> 
failureLabels) {}
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingJobInformation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingJobInformation.java
new file mode 100644
index 00000000000..3227345f1ed
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingJobInformation.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.adaptive.JobGraphJobInformation;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+
+import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Testing utils for {@link JobInformation}. */
+public class TestingJobInformation implements JobInformation {
+
+    private final Set<SlotSharingGroup> slotSharingGroups;
+    private final Set<CoLocationGroup> coLocationGroups;
+    private final Map<JobVertexID, JobVertex> jobVerticesById;
+    private final VertexParallelismStore vertexParallelismStore;
+
+    public TestingJobInformation(
+            Set<SlotSharingGroup> slotSharingGroups,
+            List<JobVertex> jobVertices,
+            VertexParallelismStore vertexParallelismStore) {
+        this.slotSharingGroups = new HashSet<>(slotSharingGroups);
+        this.coLocationGroups = new HashSet<>();
+        this.jobVerticesById =
+                jobVertices.stream()
+                        .collect(Collectors.toMap(JobVertex::getID, 
Function.identity()));
+        this.vertexParallelismStore = vertexParallelismStore;
+    }
+
+    @Override
+    public Collection<SlotSharingGroup> getSlotSharingGroups() {
+        return slotSharingGroups;
+    }
+
+    @Override
+    public Collection<CoLocationGroup> getCoLocationGroups() {
+        return coLocationGroups;
+    }
+
+    @Override
+    public JobInformation.VertexInformation getVertexInformation(JobVertexID 
jobVertexId) {
+        return new JobGraphJobInformation.JobVertexInformation(
+                jobVerticesById.get(jobVertexId),
+                vertexParallelismStore.getParallelismInfo(jobVertexId));
+    }
+
+    @Override
+    public Iterable<JobInformation.VertexInformation> getVertices() {
+        return Iterables.transform(
+                jobVerticesById.values(), (vertex) -> 
getVertexInformation(vertex.getID()));
+    }
+
+    @Override
+    public VertexParallelismStore getVertexParallelismStore() {
+        return vertexParallelismStore;
+    }
+}

Reply via email to