This is an automated email from the ASF dual-hosted git repository.
weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e42cdd1efe3 [FLINK-38338][runtime] Introduce the abstraction to
describe a rescale event. (#26981)
e42cdd1efe3 is described below
commit e42cdd1efe354d406c8791f319292d3b185d9281
Author: Yuepeng Pan <[email protected]>
AuthorDate: Mon Dec 1 10:22:05 2025 +0800
[FLINK-38338][runtime] Introduce the abstraction to describe a rescale
event. (#26981)
---
.../jobmanager/scheduler/SlotSharingGroup.java | 37 +-
.../flink/runtime/scheduler/adaptive/Finished.java | 9 +
.../scheduler/adaptive/JobGraphJobInformation.java | 19 +-
.../flink/runtime/scheduler/adaptive/State.java | 15 +-
.../adaptive/StateWithExecutionGraph.java | 11 +
.../adaptive/StateWithoutExecutionGraph.java | 9 +
.../adaptive/allocator/AllocatorUtil.java | 6 +-
.../adaptive/allocator/JobInformation.java | 7 +
.../allocator/SlotSharingSlotAllocator.java | 38 +-
.../scheduler/adaptive/timeline/Durable.java | 51 +++
.../scheduler/adaptive/timeline/Rescale.java | 458 +++++++++++++++++++++
.../scheduler/adaptive/timeline/RescaleIdInfo.java | 81 ++++
.../adaptive/timeline/SchedulerStateSpan.java | 123 ++++++
.../adaptive/timeline/SlotSharingGroupRescale.java | 159 +++++++
.../scheduler/adaptive/timeline/TerminalState.java | 47 +++
.../adaptive/timeline/TerminatedReason.java | 59 +++
.../scheduler/adaptive/timeline/TriggerCause.java | 37 ++
.../timeline/VertexParallelismRescale.java | 160 +++++++
.../api/graph/StreamingJobGraphGenerator.java | 3 +
.../runtime/scheduler/adaptive/ExecutingTest.java | 6 +
.../adaptive/allocator/TestVertexInformation.java | 5 +
.../scheduler/adaptive/timeline/RescaleTest.java | 414 +++++++++++++++++++
.../timeline/TestingAdaptiveSchedulerState.java | 75 ++++
.../adaptive/timeline/TestingJobInformation.java | 87 ++++
24 files changed, 1888 insertions(+), 28 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index c82af11235e..81327551d84 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import java.util.Collections;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
@@ -41,6 +42,8 @@ public class SlotSharingGroup implements java.io.Serializable
{
private final SlotSharingGroupId slotSharingGroupId = new
SlotSharingGroupId();
+ private String slotSharingGroupName;
+
// Represents resources of all tasks in the group. Default to be UNKNOWN.
private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
@@ -70,12 +73,44 @@ public class SlotSharingGroup implements
java.io.Serializable {
return resourceProfile;
}
+ public String getSlotSharingGroupName() {
+ return slotSharingGroupName;
+ }
+
+ public void setSlotSharingGroupName(String slotSharingGroupName) {
+ this.slotSharingGroupName = slotSharingGroupName;
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
+ @Override
+ public int hashCode() {
+ return Objects.hash(slotSharingGroupId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SlotSharingGroup that = (SlotSharingGroup) o;
+ return Objects.equals(slotSharingGroupId, that.slotSharingGroupId);
+ }
+
@Override
public String toString() {
- return "SlotSharingGroup{" + "ids=" + ids + ", resourceProfile=" +
resourceProfile + '}';
+ return "SlotSharingGroup{"
+ + "ids="
+ + ids
+ + ", slotSharingGroupId="
+ + slotSharingGroupId
+ + ", slotSharingGroupName='"
+ + slotSharingGroupName
+ + '\''
+ + ", resourceProfile="
+ + resourceProfile
+ + '}';
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
index 87f4d1f0288..d7abdab8f93 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
import org.slf4j.Logger;
@@ -34,13 +35,21 @@ class Finished implements State {
private final Logger logger;
+ private final Durable durable;
+
Finished(Context context, ArchivedExecutionGraph archivedExecutionGraph,
Logger logger) {
this.archivedExecutionGraph = archivedExecutionGraph;
this.logger = logger;
+ this.durable = new Durable();
context.onFinished(archivedExecutionGraph);
}
+ @Override
+ public Durable getDurable() {
+ return durable;
+ }
+
@Override
public void cancel() {}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
index 27fef933758..372cc67d15d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.scheduler.adaptive;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -41,7 +42,7 @@ public class JobGraphJobInformation implements JobInformation
{
private final JobGraph jobGraph;
private final JobID jobID;
private final String name;
- private final VertexParallelismStore vertexParallelismStore;
+ protected final VertexParallelismStore vertexParallelismStore;
public JobGraphJobInformation(
JobGraph jobGraph, VertexParallelismStore vertexParallelismStore) {
@@ -91,17 +92,19 @@ public class JobGraphJobInformation implements
JobInformation {
return InstantiationUtil.cloneUnchecked(jobGraph);
}
+ @Override
public VertexParallelismStore getVertexParallelismStore() {
return vertexParallelismStore;
}
- private static final class JobVertexInformation implements
JobInformation.VertexInformation {
+ @VisibleForTesting
+ public static final class JobVertexInformation implements
JobInformation.VertexInformation {
private final JobVertex jobVertex;
private final VertexParallelismInformation parallelismInfo;
- private JobVertexInformation(
+ public JobVertexInformation(
JobVertex jobVertex, VertexParallelismInformation
parallelismInfo) {
this.jobVertex = jobVertex;
this.parallelismInfo = parallelismInfo;
@@ -112,6 +115,11 @@ public class JobGraphJobInformation implements
JobInformation {
return jobVertex.getID();
}
+ @Override
+ public String getVertexName() {
+ return jobVertex.getName();
+ }
+
@Override
public int getMinParallelism() {
return parallelismInfo.getMinParallelism();
@@ -137,5 +145,10 @@ public class JobGraphJobInformation implements
JobInformation {
public CoLocationGroup getCoLocationGroup() {
return jobVertex.getCoLocationGroup();
}
+
+ @VisibleForTesting
+ public VertexParallelismInformation getVertexParallelismInfo() {
+ return parallelismInfo;
+ }
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
index 1185274fbd7..4c3d2ce721d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;
+import java.time.Instant;
import java.util.Optional;
import java.util.function.Consumer;
@@ -33,14 +35,23 @@ import java.util.function.Consumer;
* State abstraction of the {@link AdaptiveScheduler}. This interface contains
all methods every
* state implementation must support.
*/
-interface State extends LabeledGlobalFailureHandler {
+public interface State extends LabeledGlobalFailureHandler {
+
+ /**
+ * Get the durable time information of the current state.
+ *
+ * @return The durable time information of the current state.
+ */
+ Durable getDurable();
/**
* This method is called whenever one transitions out of this state.
*
* @param newState newState is the state into which the scheduler
transitions
*/
- default void onLeave(Class<? extends State> newState) {}
+ default void onLeave(Class<? extends State> newState) {
+ getDurable().setLeaveTimestamp(Instant.now().toEpochMilli());
+ }
/** Cancels the job execution. */
void cancel();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 8be4a99f0fd..e5cc1e7cdd8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -55,6 +55,7 @@ import
org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.KvStateHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.VertexEndOfDataListener;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
import
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
@@ -69,6 +70,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -101,6 +103,8 @@ abstract class StateWithExecutionGraph implements State {
private final VertexEndOfDataListener vertexEndOfDataListener;
+ private final Durable durable;
+
StateWithExecutionGraph(
Context context,
ExecutionGraph executionGraph,
@@ -118,6 +122,7 @@ abstract class StateWithExecutionGraph implements State {
this.userCodeClassLoader = userClassCodeLoader;
this.failureCollection = new ArrayList<>(failureCollection);
this.vertexEndOfDataListener = new
VertexEndOfDataListener(executionGraph);
+ this.durable = new Durable();
FutureUtils.assertNoException(
executionGraph
@@ -137,6 +142,11 @@ abstract class StateWithExecutionGraph implements State {
context.getMainThreadExecutor()));
}
+ @Override
+ public Durable getDurable() {
+ return durable;
+ }
+
ExecutionGraph getExecutionGraph() {
return executionGraph;
}
@@ -156,6 +166,7 @@ abstract class StateWithExecutionGraph implements State {
@Override
public void onLeave(Class<? extends State> newState) {
+ getDurable().setLeaveTimestamp(Instant.now().toEpochMilli());
if (!StateWithExecutionGraph.class.isAssignableFrom(newState)) {
// we are leaving the StateWithExecutionGraph --> we need to
dispose temporary services
operatorCoordinatorHandler.disposeAllOperatorCoordinators();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
index 43425445a4c..bb3b8381d77 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
import org.slf4j.Logger;
@@ -39,9 +40,17 @@ abstract class StateWithoutExecutionGraph implements State {
private final Logger logger;
+ private final Durable durable;
+
StateWithoutExecutionGraph(Context context, Logger logger) {
this.context = context;
this.logger = logger;
+ this.durable = new Durable();
+ }
+
+ @Override
+ public Durable getDurable() {
+ return durable;
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
index 8fbe44c3a1d..1d85dbfa663 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
@@ -35,13 +35,13 @@ class AllocatorUtil {
private AllocatorUtil() {}
- static Map<SlotSharingGroupId,
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+ static Map<SlotSharingGroup,
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
return
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
}
static int getMinimumRequiredSlots(
- Map<SlotSharingGroupId,
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+ Map<SlotSharingGroup,
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfos) {
return slotSharingGroupMetaInfos.values().stream()
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
index e5be2502b72..247de28ec5f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import javax.annotation.Nullable;
@@ -51,10 +52,16 @@ public interface JobInformation {
Iterable<VertexInformation> getVertices();
+ default VertexParallelismStore getVertexParallelismStore() {
+ throw new UnsupportedOperationException();
+ }
+
/** Information about a single vertex. */
interface VertexInformation {
JobVertexID getJobVertexID();
+ String getVertexName();
+
int getMinParallelism();
int getParallelism();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 9ac1ecfbedd..39bfa7902a3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -17,11 +17,9 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -119,7 +117,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
public Optional<VertexParallelism> determineParallelism(
JobInformation jobInformation, Collection<? extends SlotInfo>
freeSlots) {
- final Map<SlotSharingGroupId, SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfo =
+ final Map<SlotSharingGroup, SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfo =
getSlotSharingGroupMetaInfos(jobInformation);
final int minimumRequiredSlots =
getMinimumRequiredSlots(slotSharingGroupMetaInfo);
@@ -128,7 +126,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
return Optional.empty();
}
- final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism =
+ final Map<SlotSharingGroup, Integer> slotSharingGroupParallelism =
determineSlotsPerSharingGroup(
jobInformation,
freeSlots.size(),
@@ -146,8 +144,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
final Map<JobVertexID, Integer> vertexParallelism =
determineVertexParallelism(
containedJobVertices,
- slotSharingGroupParallelism.get(
- slotSharingGroup.getSlotSharingGroupId()));
+ slotSharingGroupParallelism.get(slotSharingGroup));
allVertexParallelism.putAll(vertexParallelism);
}
return Optional.of(new VertexParallelism(allVertexParallelism));
@@ -219,19 +216,19 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
* evenly as possible. If a group requires less than an even share of
slots the remainder is
* distributed over the remaining groups.
*/
- private static Map<SlotSharingGroupId, Integer>
determineSlotsPerSharingGroup(
+ private static Map<SlotSharingGroup, Integer>
determineSlotsPerSharingGroup(
JobInformation jobInformation,
int freeSlots,
int minRequiredSlots,
- Map<SlotSharingGroupId, SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfo) {
+ Map<SlotSharingGroup, SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfo) {
int numUnassignedSlots = freeSlots;
int numUnassignedSlotSharingGroups =
jobInformation.getSlotSharingGroups().size();
int numMinSlotsRequiredByRemainingGroups = minRequiredSlots;
- final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism =
new HashMap<>();
+ final Map<SlotSharingGroup, Integer> slotSharingGroupParallelism = new
HashMap<>();
- for (SlotSharingGroupId slotSharingGroup :
+ for (SlotSharingGroup slotSharingGroup :
sortSlotSharingGroupsByHighestParallelismRange(slotSharingGroupMetaInfo)) {
final int minParallelism =
slotSharingGroupMetaInfo.get(slotSharingGroup).getMaxLowerBound();
@@ -264,8 +261,8 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
return slotSharingGroupParallelism;
}
- private static List<SlotSharingGroupId>
sortSlotSharingGroupsByHighestParallelismRange(
- Map<SlotSharingGroupId, SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfo) {
+ private static List<SlotSharingGroup>
sortSlotSharingGroupsByHighestParallelismRange(
+ Map<SlotSharingGroup, SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfo) {
return slotSharingGroupMetaInfo.entrySet().stream()
.sorted(
@@ -367,7 +364,6 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
this.containedExecutionVertices = containedExecutionVertices;
}
- @VisibleForTesting
public SlotSharingGroup getSlotSharingGroup() {
return slotSharingGroup;
}
@@ -376,6 +372,10 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
return id;
}
+ public ResourceProfile getResourceProfile() {
+ return slotSharingGroup.getResourceProfile();
+ }
+
public Collection<ExecutionVertexID> getContainedExecutionVertices() {
return containedExecutionVertices;
}
@@ -387,7 +387,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
}
}
- static class SlotSharingGroupMetaInfo {
+ public static class SlotSharingGroupMetaInfo {
private final int minLowerBound;
private final int maxLowerBound;
@@ -415,7 +415,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
return maxUpperBound - maxLowerBound;
}
- public static Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> from(
+ public static Map<SlotSharingGroup, SlotSharingGroupMetaInfo> from(
Iterable<JobInformation.VertexInformation> vertices) {
return getPerSlotSharingGroups(
@@ -438,15 +438,15 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
metaInfo2.getMaxUpperBound())));
}
- private static <T> Map<SlotSharingGroupId, T> getPerSlotSharingGroups(
+ static <T> Map<SlotSharingGroup, T> getPerSlotSharingGroups(
Iterable<JobInformation.VertexInformation> vertices,
Function<JobInformation.VertexInformation, T> mapper,
BiFunction<T, T, T> reducer) {
- final Map<SlotSharingGroupId, T> extractedPerSlotSharingGroups =
new HashMap<>();
+ final Map<SlotSharingGroup, T> extractedPerSlotSharingGroups = new
HashMap<>();
for (JobInformation.VertexInformation vertex : vertices) {
extractedPerSlotSharingGroups.compute(
- vertex.getSlotSharingGroup().getSlotSharingGroupId(),
- (slotSharingGroupId, currentData) ->
+ vertex.getSlotSharingGroup(),
+ (ignored, currentData) ->
currentData == null
? mapper.apply(vertex)
: reducer.apply(currentData,
mapper.apply(vertex)));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Durable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Durable.java
new file mode 100644
index 00000000000..b528326ae44
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Durable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+
+/** Durable to record enter timestamp and leave timestamp. */
+public class Durable {
+
+ private final Long enterTimestamp;
+ private @Nullable Long leaveTimestamp;
+
+ public Durable() {
+ this(Instant.now().toEpochMilli(), null);
+ }
+
+ public Durable(Long enterTimestamp, @Nullable Long leaveTimestamp) {
+ this.enterTimestamp = enterTimestamp;
+ this.leaveTimestamp = leaveTimestamp;
+ }
+
+ public void setLeaveTimestamp(long timestamp) {
+ this.leaveTimestamp = timestamp;
+ }
+
+ public Long getEnterTimestamp() {
+ return enterTimestamp;
+ }
+
+ public @Nullable Long getLeaveTimestamp() {
+ return leaveTimestamp;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
new file mode 100644
index 00000000000..2fc3ef21b48
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import org.apache.flink.runtime.scheduler.adaptive.State;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.SlotSharingGroupMetaInfo;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The rescale to record the related vertices and slots change during the
rescaling process.
+ *
+ * <p>This rescale begins when the scheduler initiates a rescaling operation
and ends when the
+ * rescaling succeeds.
+ *
+ * <pre>
+ *
+ * The structure of the rescale as follows:
+ *
+ * +--> rescale id information:
+ * + +-->rescale uuid
+ * + +-->resource requirements id
+ * + +-->rescale attempt id
+ * +--> vertices:
+ * + +--> job vertex id-1 -> vertex-1 parallelism rescale:
+ * + + +--> vertex id
+ * + + +--> vertex name
+ * + + +--> slot sharing group id
+ * + + +--> slot sharing group name
+ * + + +--> desired parallelism
+ * + + +--> sufficient parallelism
+ * + + +--> pre-rescale parallelism
+ * + + +--> post-rescale parallelism
+ * + +--> job vertex id-2 -> vertex-2 parallelism rescale:
+ * + + +--> ...
+ * + + ...
+ * + ...
+ * +--> slots:
+ * + +--> slot sharing group id-1 -> slot-1 sharing group rescale:
+ * + + +--> slot sharing group id
+ * + + +--> slot sharing group name
+ * + + +--> required resource profile
+ * + + +--> minimal required slots
+ * + + +--> pre-rescale slots
+ * + + +--> post-rescale slots
+ * + + +--> acquired resource profile
+ * + +--> slot sharing group id-2 -> slot-2 sharing group rescale:
+ * + + +--> ...
+ * + + ...
+ * + ...
+ * +--> scheduler states:
+ * + +--> scheduler state span:
+ * + + +--> state
+ * + + +--> enter timestamp
+ * + + +--> leave timestamp
+ * + + +--> duration
+ * + + +--> exception information
+ * + +--> ...
+ * + ...
+ * +--> start timestamp
+ * +--> end timestamp
+ * +--> trigger cause
+ * +--> terminal state
+ * +--> terminated reason
+ *
+ * </pre>
+ *
+ * <p>The more design details about the rescale could be viewed in <a
+ * href="https://cwiki.apache.org/confluence/x/TQr0Ew">FLIP-495</a>.
+ */
+public class Rescale implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final Logger LOG = LoggerFactory.getLogger(Rescale.class);
+
+ @Nullable private transient String stringifiedException;
+
+ private final RescaleIdInfo rescaleIdInfo;
+
+ private final Map<JobVertexID, VertexParallelismRescale> vertices;
+ private final Map<SlotSharingGroupId, SlotSharingGroupRescale> slots;
+
+ private final List<SchedulerStateSpan> schedulerStates;
+
+ private Long startTimestamp;
+ private Long endTimestamp;
+
+ private TriggerCause triggerCause;
+ @Nullable private TerminalState terminalState;
+ @Nullable private TerminatedReason terminatedReason;
+
+ Rescale(RescaleIdInfo rescaleIdInfo) {
+ this.rescaleIdInfo = rescaleIdInfo;
+ this.vertices = new HashMap<>();
+ this.slots = new HashMap<>();
+ this.schedulerStates = new ArrayList<>();
+ }
+
+ @VisibleForTesting
+ Rescale addSchedulerState(SchedulerStateSpan schedulerStateSpan) {
+ if (this.isTerminated()) {
+ LOG.warn(
+ "Rescale is already terminated. The scheduler state {}
will be ignored.",
+ schedulerStateSpan);
+ return this;
+ }
+ String exceptionStr =
+ Objects.isNull(schedulerStateSpan.getStringifiedException())
+ ? stringifiedException
+ : schedulerStateSpan.getStringifiedException();
+ if (stringifiedException != null) {
+ stringifiedException = null;
+ }
+
+ schedulerStateSpan.setStringifiedException(exceptionStr);
+ this.schedulerStates.add(schedulerStateSpan);
+ return this;
+ }
+
+ public Rescale addSchedulerState(State state) {
+ return addSchedulerState(state, null);
+ }
+
+ public Rescale addSchedulerState(State schedulerState, @Nullable Throwable
throwable) {
+ Long enterTimestamp = schedulerState.getDurable().getEnterTimestamp();
+ Long leaveTimestamp = schedulerState.getDurable().getLeaveTimestamp();
+
+ long logicLeaveTimestamp =
+ Objects.isNull(leaveTimestamp) ? Instant.now().toEpochMilli()
: leaveTimestamp;
+ return addSchedulerState(
+ new SchedulerStateSpan(
+ schedulerState.getClass().getSimpleName(),
+ enterTimestamp,
+ logicLeaveTimestamp,
+ logicLeaveTimestamp - enterTimestamp,
+ ExceptionUtils.stringifyException(throwable)));
+ }
+
+ public List<SchedulerStateSpan> getSchedulerStates() {
+ return Collections.unmodifiableList(schedulerStates);
+ }
+
+ public Rescale clearSchedulerStates() {
+ this.schedulerStates.clear();
+ return this;
+ }
+
+ @Nullable
+ public TerminalState getTerminalState() {
+ return terminalState;
+ }
+
+ @Nullable
+ public String getStringifiedException() {
+ return stringifiedException;
+ }
+
+ private boolean isTerminated() {
+ return terminalState != null;
+ }
+
+ public Duration getDuration() {
+ if (this.isTerminated() && startTimestamp != null && endTimestamp !=
null) {
+ return Duration.ofMillis(endTimestamp - startTimestamp);
+ }
+ return Duration.ZERO;
+ }
+
+ public Rescale setTerminatedReason(TerminatedReason terminatedReason) {
+ Preconditions.checkNotNull(terminatedReason);
+ if (this.terminatedReason != null) {
+ LOG.warn("The old sealed reason was already set to '{}'",
this.terminatedReason);
+ }
+ this.terminatedReason = terminatedReason;
+ this.terminalState = terminatedReason.getTerminalState();
+ return this;
+ }
+
+ public Rescale setStartTimestamp(long timestamp) {
+ if (this.startTimestamp != null) {
+ LOG.warn("The old startTimestamp was already set to '{}'",
this.startTimestamp);
+ }
+ this.startTimestamp = timestamp;
+ return this;
+ }
+
+ public Long getStartTimestamp() {
+ return startTimestamp;
+ }
+
+ public Rescale setEndTimestamp(Long endTimestamp) {
+ if (this.endTimestamp != null) {
+ LOG.warn("The old endTimestamp was already set to '{}'",
this.endTimestamp);
+ }
+ this.endTimestamp = endTimestamp;
+ return this;
+ }
+
+ public Long getEndTimestamp() {
+ return endTimestamp;
+ }
+
+ public Rescale setDesiredSlots(JobInformation jobInformation) {
+ for (SlotSharingGroup sharingGroup :
jobInformation.getSlotSharingGroups()) {
+ int desiredSlot =
+ sharingGroup.getJobVertexIds().stream()
+ .map(
+ jobVertexID ->
+ jobInformation
+
.getVertexInformation(jobVertexID)
+ .getParallelism())
+ .max(Integer::compare)
+ .orElse(0);
+ SlotSharingGroupRescale sharingGroupRescaleInfo =
+ slots.computeIfAbsent(
+ sharingGroup.getSlotSharingGroupId(),
+ ignored -> new
SlotSharingGroupRescale(sharingGroup));
+ sharingGroupRescaleInfo.setDesiredSlots(desiredSlot);
+ }
+ return this;
+ }
+
+ public Rescale setDesiredVertexParallelism(JobInformation jobInformation) {
+ Map<JobVertexID, VertexParallelismInformation> allParallelismInfo =
+
jobInformation.getVertexParallelismStore().getAllParallelismInfo();
+ for (Map.Entry<JobVertexID, VertexParallelismInformation> entry :
+ allParallelismInfo.entrySet()) {
+ JobVertexID jvId = entry.getKey();
+ SlotSharingGroup slotSharingGroup =
+
jobInformation.getVertexInformation(jvId).getSlotSharingGroup();
+ String vertexName =
jobInformation.getVertexInformation(jvId).getVertexName();
+ VertexParallelismRescale vertexParallelismRescale =
+ this.vertices.computeIfAbsent(
+ jvId,
+ jobVertexID ->
+ new VertexParallelismRescale(
+ jvId, vertexName,
slotSharingGroup));
+ vertexParallelismRescale.setRequiredParallelisms(entry.getValue());
+ }
+ return this;
+ }
+
+ public Rescale setMinimalRequiredSlots(JobInformation jobInformation) {
+ final Map<SlotSharingGroup, SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfo =
+ SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+ for (Map.Entry<SlotSharingGroup, SlotSharingGroupMetaInfo> entry :
+ slotSharingGroupMetaInfo.entrySet()) {
+ SlotSharingGroup sharingGroup = entry.getKey();
+ SlotSharingGroupRescale slotSharingGroupRescale =
+ slots.computeIfAbsent(
+ sharingGroup.getSlotSharingGroupId(),
+ ignored -> new
SlotSharingGroupRescale(sharingGroup));
+
slotSharingGroupRescale.setMinimalRequiredSlots(entry.getValue().getMaxLowerBound());
+ }
+ return this;
+ }
+
+ public Rescale setPreRescaleSlotsAndParallelisms(
+ JobInformation jobInformation, @Nullable Rescale
lastCompletedRescale) {
+ if (lastCompletedRescale == null) {
+ LOG.info("No available previous parallelism to set.");
+ return this;
+ }
+ for (JobVertexID jobVertexID :
lastCompletedRescale.getVertices().keySet()) {
+ Integer preRescaleParallelism =
+
lastCompletedRescale.vertices.get(jobVertexID).getPostRescaleParallelism();
+ JobInformation.VertexInformation vertexInformation =
+ jobInformation.getVertexInformation(jobVertexID);
+ VertexParallelismRescale vertexParallelismRescale =
+ vertices.computeIfAbsent(
+ jobVertexID,
+ jobVertexId ->
+ new VertexParallelismRescale(
+ jobVertexId,
+ vertexInformation.getVertexName(),
+
vertexInformation.getSlotSharingGroup()));
+
vertexParallelismRescale.setPreRescaleParallelism(preRescaleParallelism);
+ }
+
+ Map<SlotSharingGroupId, SlotSharingGroupRescale> slotsRescales =
+ lastCompletedRescale.getSlots();
+ for (SlotSharingGroup sharingGroup :
jobInformation.getSlotSharingGroups()) {
+ SlotSharingGroupId slotSharingGroupId =
sharingGroup.getSlotSharingGroupId();
+ Integer preRescaleSlot =
slotsRescales.get(slotSharingGroupId).getPostRescaleSlots();
+ SlotSharingGroupRescale slotSharingGroupRescale =
+ slots.computeIfAbsent(
+ slotSharingGroupId,
+ ignored -> new
SlotSharingGroupRescale(sharingGroup));
+ slotSharingGroupRescale.setPreRescaleSlots(preRescaleSlot);
+ }
+
+ return this;
+ }
+
+ public Rescale setPostRescaleVertexParallelism(
+ JobInformation jobInformation, VertexParallelism
postRescaleVertexParallelism) {
+
+ Set<JobVertexID> vertices = postRescaleVertexParallelism.getVertices();
+ for (JobVertexID vertexID : vertices) {
+ JobInformation.VertexInformation vertexInformation =
+ jobInformation.getVertexInformation(vertexID);
+ VertexParallelismRescale vertexParallelismRescale =
+ this.vertices.computeIfAbsent(
+ vertexID,
+ jobVertexId ->
+ new VertexParallelismRescale(
+ jobVertexId,
+ vertexInformation.getVertexName(),
+
vertexInformation.getSlotSharingGroup()));
+
+ vertexParallelismRescale.setPostRescaleParallelism(
+ postRescaleVertexParallelism.getParallelism(vertexID));
+ }
+ return this;
+ }
+
+ public Rescale setPostRescaleSlots(
+ Collection<JobSchedulingPlan.SlotAssignment>
postRescaleSlotAssignments) {
+ Map<SlotSharingGroup, Set<JobSchedulingPlan.SlotAssignment>>
assignmentsPerSharingGroup =
+ postRescaleSlotAssignments.stream()
+ .collect(
+ Collectors.groupingBy(
+ slotAssignment ->
+ slotAssignment
+ .getTargetAs(
+
ExecutionSlotSharingGroup.class)
+ .getSlotSharingGroup(),
+ Collectors.toSet()));
+ for (Map.Entry<SlotSharingGroup,
Set<JobSchedulingPlan.SlotAssignment>> entry :
+ assignmentsPerSharingGroup.entrySet()) {
+ SlotSharingGroup sharingGroup = entry.getKey();
+ Set<JobSchedulingPlan.SlotAssignment> assignments =
+ assignmentsPerSharingGroup.get(sharingGroup);
+ int postRescaleSlot = assignments.size();
+ ResourceProfile acquiredResource =
+
assignments.iterator().next().getSlotInfo().getResourceProfile();
+ SlotSharingGroupRescale slotSharingGroupRescale =
+ slots.computeIfAbsent(
+ sharingGroup.getSlotSharingGroupId(),
+ ignored -> new
SlotSharingGroupRescale(sharingGroup));
+ slotSharingGroupRescale.setPostRescaleSlots(postRescaleSlot);
+
slotSharingGroupRescale.setAcquiredResourceProfile(acquiredResource);
+ }
+ return this;
+ }
+
+ public Rescale setTriggerCause(TriggerCause triggerCause) {
+ this.triggerCause = triggerCause;
+ return this;
+ }
+
+ public TriggerCause getTriggerCause() {
+ return triggerCause;
+ }
+
+ public void log() {
+ LOG.info("Updated rescale is: {}", this);
+ }
+
+ public Map<JobVertexID, VertexParallelismRescale> getVertices() {
+ return Collections.unmodifiableMap(vertices);
+ }
+
+ public Map<SlotSharingGroupId, SlotSharingGroupRescale> getSlots() {
+ return Collections.unmodifiableMap(slots);
+ }
+
+ public static boolean isTerminated(Rescale rescale) {
+ return rescale != null && rescale.isTerminated();
+ }
+
+ public Rescale setStringifiedException(String stringifiedException) {
+ this.stringifiedException = stringifiedException;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "Rescale{"
+ + "stringifiedException='"
+ + stringifiedException
+ + '\''
+ + ", rescaleIdInfo="
+ + rescaleIdInfo
+ + ", vertices="
+ + vertices
+ + ", slots="
+ + slots
+ + ", schedulerStates="
+ + schedulerStates
+ + ", startTimestamp="
+ + startTimestamp
+ + ", endTimestamp="
+ + endTimestamp
+ + ", triggerCause="
+ + triggerCause
+ + ", terminalState="
+ + terminalState
+ + ", terminatedReason="
+ + terminatedReason
+ + '}';
+ }
+
+ @VisibleForTesting
+ Map<JobVertexID, VertexParallelismRescale> getModifiableVertices() {
+ return vertices;
+ }
+
+ @VisibleForTesting
+ Map<SlotSharingGroupId, SlotSharingGroupRescale> getModifiableSlots() {
+ return slots;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
new file mode 100644
index 00000000000..94d36cc1393
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.util.AbstractID;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The class to represent the rescale id description in one resource
requirements rescale. */
+public class RescaleIdInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AbstractID rescaleUuid;
+ private final AbstractID resourceRequirementsId;
+ private final long rescaleAttemptId;
+
+ public RescaleIdInfo(AbstractID resourceRequirementsId, Long
rescaleAttemptId) {
+ this.resourceRequirementsId = resourceRequirementsId;
+ this.rescaleAttemptId = rescaleAttemptId;
+ this.rescaleUuid = new AbstractID();
+ }
+
+ public AbstractID getRescaleUuid() {
+ return rescaleUuid;
+ }
+
+ public AbstractID getResourceRequirementsId() {
+ return resourceRequirementsId;
+ }
+
+ public long getRescaleAttemptId() {
+ return rescaleAttemptId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RescaleIdInfo rescaleIdInfo = (RescaleIdInfo) o;
+ return rescaleAttemptId == rescaleIdInfo.rescaleAttemptId
+ && Objects.equals(rescaleUuid, rescaleIdInfo.rescaleUuid)
+ && Objects.equals(resourceRequirementsId,
rescaleIdInfo.resourceRequirementsId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rescaleUuid, resourceRequirementsId,
rescaleAttemptId);
+ }
+
+ @Override
+ public String toString() {
+ return "IdEpoch{"
+ + "rescaleUuid="
+ + rescaleUuid
+ + ", resourceRequirementsId="
+ + resourceRequirementsId
+ + ", rescaleAttemptId="
+ + rescaleAttemptId
+ + '}';
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
new file mode 100644
index 00000000000..1c7e1d02ec3
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Utils class to record the information of a scheduler state that contains
the span of the time of
+ * the adaptive scheduler state, enter timestamp, leave timestamp, the
exception if occurred during
+ * the adaptive scheduler state.
+ */
+public class SchedulerStateSpan implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String state;
+
+ @Nullable private final Long enterTimestamp;
+
+ @Nullable private final Long leaveTimestamp;
+
+ @Nullable private final Long duration;
+
+ @Nullable private String stringifiedException;
+
+ public SchedulerStateSpan(
+ String state,
+ Long logicEnterMillis,
+ Long logicLeaveMillis,
+ Long duration,
+ String stringifiedException) {
+ this.state = Preconditions.checkNotNull(state);
+ this.enterTimestamp = logicEnterMillis;
+ this.leaveTimestamp = logicLeaveMillis;
+ this.duration = duration;
+ this.stringifiedException = stringifiedException;
+ }
+
+ @Nullable
+ public Long getLeaveTimestamp() {
+ return leaveTimestamp;
+ }
+
+ public void setStringifiedException(@Nullable String stringifiedException)
{
+ this.stringifiedException = stringifiedException;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ @Nullable
+ public Long getEnterTimestamp() {
+ return enterTimestamp;
+ }
+
+ @Nullable
+ public Long getDuration() {
+ return duration;
+ }
+
+ @Nullable
+ public String getStringifiedException() {
+ return stringifiedException;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SchedulerStateSpan that = (SchedulerStateSpan) o;
+ return Objects.equals(state, that.state)
+ && Objects.equals(enterTimestamp, that.enterTimestamp)
+ && Objects.equals(leaveTimestamp, that.leaveTimestamp)
+ && Objects.equals(duration, that.duration)
+ && Objects.equals(stringifiedException,
that.stringifiedException);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(state, enterTimestamp, leaveTimestamp, duration,
stringifiedException);
+ }
+
+ @Override
+ public String toString() {
+ return "SchedulerStateSpan{"
+ + "state='"
+ + state
+ + '\''
+ + ", enterTimestamp="
+ + enterTimestamp
+ + ", leaveTimestamp="
+ + leaveTimestamp
+ + ", duration="
+ + duration
+ + ", stringifiedException='"
+ + stringifiedException
+ + '\''
+ + '}';
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SlotSharingGroupRescale.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SlotSharingGroupRescale.java
new file mode 100644
index 00000000000..84ee73275c0
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SlotSharingGroupRescale.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The matching information of a requested {@link
+ * org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup}.
+ */
+public class SlotSharingGroupRescale implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SlotSharingGroupId slotSharingGroupId;
+ private final String slotSharingGroupName;
+ private final ResourceProfile requiredResourceProfile;
+ private Integer desiredSlots;
+ private Integer minimalRequiredSlots;
+ @Nullable private Integer preRescaleSlots;
+ @Nullable private Integer postRescaleSlots;
+ @Nullable private ResourceProfile acquiredResourceProfile;
+
+ public SlotSharingGroupRescale(SlotSharingGroup slotSharingGroup) {
+ this.slotSharingGroupId = slotSharingGroup.getSlotSharingGroupId();
+ this.slotSharingGroupName = slotSharingGroup.getSlotSharingGroupName();
+ this.requiredResourceProfile = slotSharingGroup.getResourceProfile();
+ }
+
+ public SlotSharingGroupId getSlotSharingGroupId() {
+ return slotSharingGroupId;
+ }
+
+ public String getSlotSharingGroupName() {
+ return slotSharingGroupName;
+ }
+
+ public Integer getDesiredSlots() {
+ return desiredSlots;
+ }
+
+ public void setDesiredSlots(Integer desiredSlots) {
+ this.desiredSlots = desiredSlots;
+ }
+
+ public Integer getMinimalRequiredSlots() {
+ return minimalRequiredSlots;
+ }
+
+ public void setMinimalRequiredSlots(Integer minimalRequiredSlots) {
+ this.minimalRequiredSlots = minimalRequiredSlots;
+ }
+
+ @Nullable
+ public Integer getPreRescaleSlots() {
+ return preRescaleSlots;
+ }
+
+ public void setPreRescaleSlots(Integer preRescaleSlots) {
+ this.preRescaleSlots = preRescaleSlots;
+ }
+
+ @Nullable
+ public Integer getPostRescaleSlots() {
+ return postRescaleSlots;
+ }
+
+ public void setPostRescaleSlots(Integer postRescaleSlots) {
+ this.postRescaleSlots = postRescaleSlots;
+ }
+
+ public ResourceProfile getRequiredResourceProfile() {
+ return requiredResourceProfile;
+ }
+
+ @Nullable
+ public ResourceProfile getAcquiredResourceProfile() {
+ return acquiredResourceProfile;
+ }
+
+ public void setAcquiredResourceProfile(ResourceProfile
acquiredResourceProfile) {
+ this.acquiredResourceProfile = acquiredResourceProfile;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SlotSharingGroupRescale that = (SlotSharingGroupRescale) o;
+ return Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
+ && Objects.equals(slotSharingGroupName,
that.slotSharingGroupName)
+ && Objects.equals(requiredResourceProfile,
that.requiredResourceProfile)
+ && Objects.equals(desiredSlots, that.desiredSlots)
+ && Objects.equals(minimalRequiredSlots,
that.minimalRequiredSlots)
+ && Objects.equals(preRescaleSlots, that.preRescaleSlots)
+ && Objects.equals(postRescaleSlots, that.postRescaleSlots)
+ && Objects.equals(acquiredResourceProfile,
that.acquiredResourceProfile);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ slotSharingGroupId,
+ slotSharingGroupName,
+ requiredResourceProfile,
+ desiredSlots,
+ minimalRequiredSlots,
+ preRescaleSlots,
+ postRescaleSlots,
+ acquiredResourceProfile);
+ }
+
+ @Override
+ public String toString() {
+ return "SlotSharingGroupRescale{"
+ + "slotSharingGroupId="
+ + slotSharingGroupId
+ + ", slotSharingGroupName='"
+ + slotSharingGroupName
+ + '\''
+ + ", requiredResourceProfile="
+ + requiredResourceProfile
+ + ", desiredSlots="
+ + desiredSlots
+ + ", minimalRequiredSlots="
+ + minimalRequiredSlots
+ + ", preRescaleSlots="
+ + preRescaleSlots
+ + ", postRescaleSlots="
+ + postRescaleSlots
+ + ", acquiredResourceProfile="
+ + acquiredResourceProfile
+ + '}';
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminalState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminalState.java
new file mode 100644
index 00000000000..cf9272b44b3
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminalState.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+/** The enum to represent the terminal state of a rescale. */
+public enum TerminalState {
+ COMPLETED("It represents the rescale was completed successfully"),
+ FAILED(
+ "It represents the rescale was failed due to some exceptions about
lack of condition resources."),
+ IGNORED(
+ "It represents the rescale was ignored by some new conditions that
could trigger a new rescale.\n"
+ + "For example,\n"
+ + " The scheduler has received a new resource request,\n"
+ + " A job restart is triggered during rescale due to any
exception,\n"
+ + " Available resources or parallelism do not change
during rescale,\n"
+ + " The job reaches a terminal state during rescale:
FINISHED, FAILING, CANCELING.");
+
+ private final String description;
+
+ TerminalState(String description) {
+ this.description = description;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public static boolean isTerminated(TerminalState terminalState) {
+ return terminalState != null;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminatedReason.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminatedReason.java
new file mode 100644
index 00000000000..3014a02029c
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TerminatedReason.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+/** The enum to represent the reason why a rescale event is terminated. */
+public enum TerminatedReason {
+ SUCCEEDED(TerminalState.COMPLETED, "The rescale was completed
successfully."),
+ EXCEPTION_OCCURRED(
+ TerminalState.FAILED,
+ "The rescale was failed due to some exceptions about no resources
enough, etc."),
+ RESOURCE_REQUIREMENTS_UPDATED(
+ TerminalState.IGNORED, "The rescale was ignored due to the new
resource requirements."),
+ NO_RESOURCES_OR_PARALLELISMS_CHANGE(
+ TerminalState.IGNORED,
+ "The rescale was ignored due to no available resources change or
parallelism change."),
+ JOB_FINISHED(TerminalState.IGNORED, "The rescale was ignored due to the
job finished."),
+ JOB_FAILED(TerminalState.IGNORED, "The rescale was ignored due to the job
failed."),
+ JOB_CANCELED(TerminalState.IGNORED, "The rescale was ignored due to the
job canceled."),
+ /**
+ * The value could be deleted due to
+ * https://lists.apache.org/thread/hh7w2p6lnmbo1q6d9ngkttdyrw4lp74h. Merge
the current
+ * non-terminated rescale and the new rescale triggered by recoverable
failover into the current
+ * rescale.
+ */
+ JOB_FAILOVER_RESTARTING(
+ TerminalState.IGNORED, "The rescale was ignored due to the job
failover restarting.");
+
+ private final TerminalState terminalState;
+ private final String description;
+
+ TerminatedReason(TerminalState terminalState, String description) {
+ this.terminalState = terminalState;
+ this.description = description;
+ }
+
+ public TerminalState getTerminalState() {
+ return terminalState;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TriggerCause.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TriggerCause.java
new file mode 100644
index 00000000000..584bdba88c8
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TriggerCause.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+/** The cause of trigger rescaling. */
+public enum TriggerCause {
+ INITIAL_SCHEDULE("The first schedule of the job starting triggerred the
rescale."),
+ UPDATE_REQUIREMENT("Updating job resource requirements triggerred the
rescale."),
+ NEW_RESOURCE_AVAILABLE("That new resources were available triggerred the
rescale."),
+ RECOVERABLE_FAILOVER("Recoverable failover triggerred the rescale.");
+
+ private final String description;
+
+ TriggerCause(String description) {
+ this.description = description;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
new file mode 100644
index 00000000000..3ea9476e79a
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The rescale information of a {@link
org.apache.flink.runtime.jobgraph.JobVertex}. */
+public class VertexParallelismRescale implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final JobVertexID jobVertexId;
+ private String jobVertexName;
+ private SlotSharingGroupId slotSharingGroupId;
+ private String slotSharingGroupName;
+ private Integer desiredParallelism;
+ private Integer sufficientParallelism;
+
+ @Nullable private Integer preRescaleParallelism;
+
+ @Nullable private Integer postRescaleParallelism;
+
+ public VertexParallelismRescale(
+ JobVertexID jobVertexId, String jobVertexName, SlotSharingGroup
slotSharingGroup) {
+ this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+ this.jobVertexName = jobVertexName;
+ this.slotSharingGroupName = slotSharingGroup.getSlotSharingGroupName();
+ this.slotSharingGroupId = slotSharingGroup.getSlotSharingGroupId();
+ }
+
+ public JobVertexID getJobVertexId() {
+ return jobVertexId;
+ }
+
+ public String getJobVertexName() {
+ return jobVertexName;
+ }
+
+ public void setJobVertexName(String jobVertexName) {
+ this.jobVertexName = jobVertexName;
+ }
+
+ public SlotSharingGroupId getSlotSharingGroupId() {
+ return slotSharingGroupId;
+ }
+
+ public String getSlotSharingGroupName() {
+ return slotSharingGroupName;
+ }
+
+ @Nullable
+ public Integer getPreRescaleParallelism() {
+ return preRescaleParallelism;
+ }
+
+ public void setPreRescaleParallelism(@Nullable Integer
preRescaleParallelism) {
+ this.preRescaleParallelism = preRescaleParallelism;
+ }
+
+ public Integer getDesiredParallelism() {
+ return desiredParallelism;
+ }
+
+ public Integer getSufficientParallelism() {
+ return sufficientParallelism;
+ }
+
+ public void setRequiredParallelisms(VertexParallelismInformation
vertexParallelismInformation) {
+ this.sufficientParallelism =
vertexParallelismInformation.getMinParallelism();
+ this.desiredParallelism =
vertexParallelismInformation.getParallelism();
+ }
+
+ @Nullable
+ public Integer getPostRescaleParallelism() {
+ return postRescaleParallelism;
+ }
+
+ public void setPostRescaleParallelism(Integer postRescaleParallelism) {
+ this.postRescaleParallelism = postRescaleParallelism;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ VertexParallelismRescale that = (VertexParallelismRescale) o;
+ return Objects.equals(jobVertexId, that.jobVertexId)
+ && Objects.equals(jobVertexName, that.jobVertexName)
+ && Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
+ && Objects.equals(slotSharingGroupName,
that.slotSharingGroupName)
+ && Objects.equals(preRescaleParallelism,
that.preRescaleParallelism)
+ && Objects.equals(desiredParallelism, that.desiredParallelism)
+ && Objects.equals(sufficientParallelism,
that.sufficientParallelism)
+ && Objects.equals(postRescaleParallelism,
that.postRescaleParallelism);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ jobVertexId,
+ jobVertexName,
+ slotSharingGroupId,
+ slotSharingGroupName,
+ preRescaleParallelism,
+ desiredParallelism,
+ sufficientParallelism,
+ postRescaleParallelism);
+ }
+
+ @Override
+ public String toString() {
+ return "VertexParallelismRescale{"
+ + "jobVertexId="
+ + jobVertexId
+ + ", jobVertexName='"
+ + jobVertexName
+ + '\''
+ + ", slotSharingGroupId="
+ + slotSharingGroupId
+ + ", slotSharingGroupName='"
+ + slotSharingGroupName
+ + '\''
+ + ", desiredParallelism="
+ + desiredParallelism
+ + ", sufficientParallelism="
+ + sufficientParallelism
+ + ", preRescaleParallelism="
+ + preRescaleParallelism
+ + ", postRescaleParallelism="
+ + postRescaleParallelism
+ + '}';
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1aa62761d43..ae948dc3852 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1926,6 +1926,8 @@ public class StreamingJobGraphGenerator {
// fallback to the region slot sharing group by default
effectiveSlotSharingGroup =
checkNotNull(vertexRegionSlotSharingGroups.get(vertex.getID()));
+ effectiveSlotSharingGroup.setSlotSharingGroupName(
+ StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP);
} else {
checkState(
!jobVertexBuildContext.hasHybridResultPartition(),
@@ -1941,6 +1943,7 @@ public class StreamingJobGraphGenerator {
.ifPresent(ssg::setResourceProfile);
return ssg;
});
+
effectiveSlotSharingGroup.setSlotSharingGroupName(slotSharingGroupKey);
}
vertex.setSlotSharingGroup(effectiveSlotSharingGroup);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index 862722407de..9fccfb70fac 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -63,6 +63,7 @@ import
org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
import
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import
org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution;
@@ -1040,6 +1041,11 @@ class ExecutingTest {
@Override
public void cancel() {}
+ @Override
+ public Durable getDurable() {
+ return null;
+ }
+
@Override
public void suspend(Throwable cause) {}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
index bc42c9422fa..8d776e5c58e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
@@ -82,6 +82,11 @@ public class TestVertexInformation implements
JobInformation.VertexInformation {
return jobVertexId;
}
+ @Override
+ public String getVertexName() {
+ return "JobVertex-" + jobVertexId.toString();
+ }
+
@Override
public int getMinParallelism() {
return minParallelism;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
new file mode 100644
index 00000000000..3d0775860b5
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
+import org.apache.flink.runtime.scheduler.adaptive.JobGraphJobInformation;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import org.apache.flink.runtime.scheduler.adaptive.State;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Rescale}. */
+class RescaleTest {
+
+ private final Function<Integer, Optional<String>> ingoredFunction =
integer -> Optional.empty();
+
+ private final SlotSharingGroup slotSharingGroupA = new SlotSharingGroup();
+ private final SlotSharingGroup slotSharingGroupB = new SlotSharingGroup();
+
+ private List<SlotSharingGroup> slotSharingGroups;
+
+ private JobVertex jobVertex1OfSlotSharingGroupA;
+ private JobVertex jobVertex2OfSlotSharingGroupA;
+ private JobVertex jobVertex1OfSlotSharingGroupB;
+ private JobVertex jobVertex2OfSlotSharingGroupB;
+
+ private List<JobVertex> jobVertices;
+
+ private JobGraphJobInformation.JobVertexInformation
jobVertexInformation1OfSlotSharingGroupA;
+ private JobGraphJobInformation.JobVertexInformation
jobVertexInformation2OfSlotSharingGroupA;
+ private JobGraphJobInformation.JobVertexInformation
jobVertexInformation1OfSlotSharingGroupB;
+ private JobGraphJobInformation.JobVertexInformation
jobVertexInformation2OfSlotSharingGroupB;
+
+ private JobInformation jobInformation;
+
+ @BeforeEach
+ void setup() {
+ this.slotSharingGroupA.setSlotSharingGroupName("ssgA");
+ this.slotSharingGroupB.setSlotSharingGroupName("ssgB");
+
+ this.slotSharingGroups =
+ new ArrayList<>() {
+ {
+ add(slotSharingGroupA);
+ add(slotSharingGroupB);
+ }
+ };
+
+ this.jobVertex1OfSlotSharingGroupA = new
JobVertex("JobVertex1OfSlotSharingGroupA");
+ jobVertex1OfSlotSharingGroupA.setSlotSharingGroup(slotSharingGroupA);
+ this.jobVertex2OfSlotSharingGroupA = new
JobVertex("JobVertex2OfSlotSharingGroupA");
+ jobVertex2OfSlotSharingGroupA.setSlotSharingGroup(slotSharingGroupA);
+
+ this.jobVertex1OfSlotSharingGroupB = new
JobVertex("JobVertex1OfSlotSharingGroupB");
+ jobVertex1OfSlotSharingGroupB.setSlotSharingGroup(slotSharingGroupB);
+ this.jobVertex2OfSlotSharingGroupB = new
JobVertex("JobVertex2OfSlotSharingGroupB");
+ jobVertex2OfSlotSharingGroupB.setSlotSharingGroup(slotSharingGroupB);
+
+ this.jobVertices =
+ new ArrayList<>() {
+ {
+ add(jobVertex1OfSlotSharingGroupA);
+ add(jobVertex2OfSlotSharingGroupA);
+ add(jobVertex1OfSlotSharingGroupB);
+ add(jobVertex2OfSlotSharingGroupB);
+ }
+ };
+
+ this.jobVertexInformation1OfSlotSharingGroupA =
+ new JobGraphJobInformation.JobVertexInformation(
+ jobVertex1OfSlotSharingGroupA,
+ new DefaultVertexParallelismInfo(1, 2, 4,
ingoredFunction));
+ this.jobVertexInformation2OfSlotSharingGroupA =
+ new JobGraphJobInformation.JobVertexInformation(
+ jobVertex2OfSlotSharingGroupA,
+ new DefaultVertexParallelismInfo(2, 3, 8,
ingoredFunction));
+ this.jobVertexInformation1OfSlotSharingGroupB =
+ new JobGraphJobInformation.JobVertexInformation(
+ jobVertex1OfSlotSharingGroupB,
+ new DefaultVertexParallelismInfo(3, 4, 16,
ingoredFunction));
+ this.jobVertexInformation2OfSlotSharingGroupB =
+ new JobGraphJobInformation.JobVertexInformation(
+ jobVertex2OfSlotSharingGroupB,
+ new DefaultVertexParallelismInfo(4, 5, 32,
ingoredFunction));
+
+ DefaultVertexParallelismStore defaultVertexParallelismStore =
+ new DefaultVertexParallelismStore();
+ defaultVertexParallelismStore.setParallelismInfo(
+ jobVertex1OfSlotSharingGroupA.getID(),
+
jobVertexInformation1OfSlotSharingGroupA.getVertexParallelismInfo());
+ defaultVertexParallelismStore.setParallelismInfo(
+ jobVertex2OfSlotSharingGroupA.getID(),
+
jobVertexInformation2OfSlotSharingGroupA.getVertexParallelismInfo());
+ defaultVertexParallelismStore.setParallelismInfo(
+ jobVertex1OfSlotSharingGroupB.getID(),
+
jobVertexInformation1OfSlotSharingGroupB.getVertexParallelismInfo());
+ defaultVertexParallelismStore.setParallelismInfo(
+ jobVertex2OfSlotSharingGroupB.getID(),
+
jobVertexInformation2OfSlotSharingGroupB.getVertexParallelismInfo());
+
+ this.jobInformation =
+ new TestingJobInformation(
+ new HashSet<>() {
+ {
+ add(slotSharingGroupA);
+ add(slotSharingGroupB);
+ }
+ },
+ new ArrayList<>() {
+ {
+ add(jobVertex1OfSlotSharingGroupA);
+ add(jobVertex2OfSlotSharingGroupA);
+ add(jobVertex1OfSlotSharingGroupB);
+ add(jobVertex2OfSlotSharingGroupB);
+ }
+ },
+ defaultVertexParallelismStore);
+ }
+
+ @Test
+ void testAddSchedulerState() {
+ Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+
+ // Test for add a state span into a terminated rescale.
+ rescale.setTerminatedReason(TerminatedReason.SUCCEEDED);
+ rescale.addSchedulerState(new SchedulerStateSpan("", null, null, null,
null));
+ assertThat(rescale.getSchedulerStates()).isEmpty();
+
+ // Test for add a state span into a non-terminated rescale.
+ rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.addSchedulerState(new SchedulerStateSpan("", null, null, null,
null));
+ assertThat(rescale.getSchedulerStates()).hasSize(1);
+
+ // Test the correctness of throwable processing.
+ String stringifiedException1 =
+ ExceptionUtils.stringifyException(new
RuntimeException("exception1"));
+ rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.setStringifiedException(stringifiedException1);
+ rescale.addSchedulerState(new SchedulerStateSpan("", null, null, null,
null));
+ SchedulerStateSpan schedulerStateSpan =
rescale.getSchedulerStates().get(0);
+
assertThat(schedulerStateSpan.getStringifiedException()).isEqualTo(stringifiedException1);
+ assertThat(rescale.getStringifiedException()).isNull();
+
+ rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.addSchedulerState(
+ new SchedulerStateSpan("", null, null, null,
stringifiedException1));
+ schedulerStateSpan = rescale.getSchedulerStates().get(0);
+
assertThat(schedulerStateSpan.getStringifiedException()).isEqualTo(stringifiedException1);
+
+ // Test the correctness of the end time of span auto-fulfill.
+ State stateWithoutEndTimestamp = new TestingAdaptiveSchedulerState(2L,
null);
+ rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.setStringifiedException(stringifiedException1);
+ rescale.addSchedulerState(stateWithoutEndTimestamp, null);
+ schedulerStateSpan = rescale.getSchedulerStates().get(0);
+ assertThat(schedulerStateSpan.getLeaveTimestamp())
+ .isLessThanOrEqualTo(Instant.now().toEpochMilli());
+ }
+
+ @Test
+ void testSetDesiredSlots() {
+ Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.setDesiredSlots(jobInformation);
+ Map<SlotSharingGroupId, SlotSharingGroupRescale> slots =
rescale.getSlots();
+ assertThat(slots).hasSize(2);
+ assertThat(slots.keySet())
+ .hasSameElementsAs(
+ slotSharingGroups.stream()
+ .map(SlotSharingGroup::getSlotSharingGroupId)
+ .collect(Collectors.toSet()));
+ assertThat(
+ slots.values().stream()
+ .map(SlotSharingGroupRescale::getDesiredSlots)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(3, 5);
+ assertThat(
+ slots.values().stream()
+
.map(SlotSharingGroupRescale::getRequiredResourceProfile)
+ .collect(Collectors.toSet()))
+ .containsExactly(ResourceProfile.UNKNOWN);
+ }
+
+ @Test
+ void testSetDesiredVertexParallelism() {
+ Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.setDesiredVertexParallelism(jobInformation);
+ Map<JobVertexID, VertexParallelismRescale> vertices =
rescale.getVertices();
+ assertThat(vertices).hasSize(4);
+ assertThat(vertices.keySet())
+ .isEqualTo(
+ vertices.values().stream()
+ .map(VertexParallelismRescale::getJobVertexId)
+ .collect(Collectors.toSet()))
+ .hasSameElementsAs(
+
jobVertices.stream().map(JobVertex::getID).collect(Collectors.toSet()));
+
+ assertThat(
+ vertices.values().stream()
+
.map(VertexParallelismRescale::getJobVertexName)
+ .collect(Collectors.toSet()))
+ .hasSameElementsAs(
+
jobVertices.stream().map(JobVertex::getName).collect(Collectors.toSet()));
+
+ assertThat(
+ vertices.values().stream()
+
.map(VertexParallelismRescale::getSlotSharingGroupId)
+ .collect(Collectors.toSet()))
+ .hasSameElementsAs(
+ jobVertices.stream()
+ .map(jv ->
jv.getSlotSharingGroup().getSlotSharingGroupId())
+ .collect(Collectors.toSet()));
+
+ assertThat(
+ vertices.values().stream()
+
.map(VertexParallelismRescale::getSlotSharingGroupName)
+ .collect(Collectors.toSet()))
+ .hasSameElementsAs(
+ jobVertices.stream()
+ .map(jv ->
jv.getSlotSharingGroup().getSlotSharingGroupName())
+ .collect(Collectors.toSet()));
+
+ assertThat(
+ vertices.values().stream()
+
.map(VertexParallelismRescale::getDesiredParallelism)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(2, 3, 4, 5);
+ assertThat(
+ vertices.values().stream()
+
.map(VertexParallelismRescale::getSufficientParallelism)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(1, 2, 3, 4);
+ }
+
+ @Test
+ void testSetMinimalRequiredSlots() {
+ Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.setMinimalRequiredSlots(jobInformation);
+ Map<SlotSharingGroupId, SlotSharingGroupRescale> slots =
rescale.getSlots();
+ assertThat(slots).hasSize(2);
+ assertThat(slots.keySet())
+ .containsExactlyInAnyOrder(
+ slotSharingGroupA.getSlotSharingGroupId(),
+ slotSharingGroupB.getSlotSharingGroupId());
+ assertThat(
+ slots.values().stream()
+
.map(SlotSharingGroupRescale::getMinimalRequiredSlots)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(2, 4);
+ }
+
+ @Test
+ void testSetPreRescaleSlotsAndParallelisms() {
+ // Test for null last rescale.
+ Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 2L));
+ rescale.setPreRescaleSlotsAndParallelisms(jobInformation, null);
+ assertThat(rescale.getSlots()).isEmpty();
+ assertThat(rescale.getVertices()).isEmpty();
+
+ // Test for non-null last rescale.
+ // Prepare the last completed rescale.
+ Rescale lastCompletedRescale = new Rescale(new RescaleIdInfo(new
AbstractID(), 1L));
+ Map<JobVertexID, VertexParallelismRescale> lastRescaleVertices =
+ lastCompletedRescale.getModifiableVertices();
+ jobVertices.forEach(
+ jobVertex -> {
+ VertexParallelismRescale vertexParallelismRescale =
+ lastRescaleVertices.computeIfAbsent(
+ jobVertex.getID(),
+ ignored ->
+ new VertexParallelismRescale(
+ jobVertex.getID(),
+ jobInformation
+
.getVertexInformation(jobVertex.getID())
+ .getVertexName(),
+ jobInformation
+
.getVertexInformation(jobVertex.getID())
+
.getSlotSharingGroup()));
+ vertexParallelismRescale.setPostRescaleParallelism(4);
+ });
+ Map<SlotSharingGroupId, SlotSharingGroupRescale>
lastRescaleSlotSharingGroups =
+ lastCompletedRescale.getModifiableSlots();
+ slotSharingGroups.forEach(
+ group -> {
+ SlotSharingGroupRescale slotSharingGroupRescale =
+ lastRescaleSlotSharingGroups.computeIfAbsent(
+ group.getSlotSharingGroupId(),
+ ignored -> new
SlotSharingGroupRescale(group));
+ slotSharingGroupRescale.setPostRescaleSlots(4);
+ });
+
+ rescale.setPreRescaleSlotsAndParallelisms(jobInformation,
lastCompletedRescale);
+
+ assertThat(rescale.getSlots()).hasSize(2);
+ assertThat(
+ rescale.getSlots().values().stream()
+
.map(SlotSharingGroupRescale::getPreRescaleSlots)
+ .collect(Collectors.toSet()))
+ .containsExactly(4);
+
+ assertThat(
+ rescale.getVertices().values().stream()
+
.map(VertexParallelismRescale::getPreRescaleParallelism)
+ .collect(Collectors.toSet()))
+ .containsExactly(4);
+ }
+
+ @Test
+ void testSetPostRescaleVertexParallelism() {
+ Map<JobVertexID, Integer> parallelismForVertices = new HashMap<>();
+ VertexParallelism postVertexParallelism = new
VertexParallelism(parallelismForVertices);
+ jobVertices.forEach(
+ vertex -> {
+ parallelismForVertices.put(vertex.getID(), 2);
+ });
+ Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.setPostRescaleVertexParallelism(jobInformation,
postVertexParallelism);
+ assertThat(
+ rescale.getVertices().values().stream()
+
.map(VertexParallelismRescale::getPostRescaleParallelism)
+ .collect(Collectors.toSet()))
+ .containsExactly(2);
+ }
+
+ @Test
+ void testSetPostRescaleSlots() {
+ List<JobSchedulingPlan.SlotAssignment> slotAssignments = new
ArrayList<>();
+
+ slotAssignments.add(
+ new JobSchedulingPlan.SlotAssignment(
+ new TestingSlot(ResourceProfile.UNKNOWN),
+ new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+ slotSharingGroupA, null)));
+ slotAssignments.add(
+ new JobSchedulingPlan.SlotAssignment(
+ new TestingSlot(ResourceProfile.UNKNOWN),
+ new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+ slotSharingGroupA, null)));
+
+ slotAssignments.add(
+ new JobSchedulingPlan.SlotAssignment(
+ new TestingSlot(ResourceProfile.UNKNOWN),
+ new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+ slotSharingGroupB, null)));
+ slotAssignments.add(
+ new JobSchedulingPlan.SlotAssignment(
+ new TestingSlot(ResourceProfile.UNKNOWN),
+ new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+ slotSharingGroupB, null)));
+ slotAssignments.add(
+ new JobSchedulingPlan.SlotAssignment(
+ new TestingSlot(ResourceProfile.UNKNOWN),
+ new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+ slotSharingGroupB, null)));
+ slotAssignments.add(
+ new JobSchedulingPlan.SlotAssignment(
+ new TestingSlot(ResourceProfile.UNKNOWN),
+ new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+ slotSharingGroupB, null)));
+
+ Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.setPostRescaleSlots(slotAssignments);
+ assertThat(
+ rescale.getSlots().values().stream()
+
.map(SlotSharingGroupRescale::getPostRescaleSlots)
+ .collect(Collectors.toSet()))
+ .containsExactly(2, 4);
+ assertThat(
+ rescale.getSlots().values().stream()
+
.map(SlotSharingGroupRescale::getAcquiredResourceProfile)
+ .collect(Collectors.toSet()))
+ .containsExactly(ResourceProfile.UNKNOWN);
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingAdaptiveSchedulerState.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingAdaptiveSchedulerState.java
new file mode 100644
index 00000000000..4c35f917da1
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingAdaptiveSchedulerState.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.adaptive.State;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Testing helper for adaptive scheduler state span. */
+public class TestingAdaptiveSchedulerState implements State {
+
+ private final Durable durable;
+
+ TestingAdaptiveSchedulerState(Long inTimestamp, @Nullable Long
outTimestamp) {
+ this.durable = new Durable(inTimestamp, outTimestamp);
+ }
+
+ @Override
+ public Durable getDurable() {
+ return durable;
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void suspend(Throwable cause) {}
+
+ @Override
+ public JobID getJobId() {
+ return null;
+ }
+
+ @Override
+ public JobStatus getJobStatus() {
+ return null;
+ }
+
+ @Override
+ public ArchivedExecutionGraph getJob() {
+ return null;
+ }
+
+ @Override
+ public Logger getLogger() {
+ return null;
+ }
+
+ @Override
+ public void handleGlobalFailure(
+ Throwable cause, CompletableFuture<Map<String, String>>
failureLabels) {}
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingJobInformation.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingJobInformation.java
new file mode 100644
index 00000000000..3227345f1ed
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/TestingJobInformation.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.adaptive.JobGraphJobInformation;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+
+import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Testing utils for {@link JobInformation}. */
+public class TestingJobInformation implements JobInformation {
+
+ private final Set<SlotSharingGroup> slotSharingGroups;
+ private final Set<CoLocationGroup> coLocationGroups;
+ private final Map<JobVertexID, JobVertex> jobVerticesById;
+ private final VertexParallelismStore vertexParallelismStore;
+
+ public TestingJobInformation(
+ Set<SlotSharingGroup> slotSharingGroups,
+ List<JobVertex> jobVertices,
+ VertexParallelismStore vertexParallelismStore) {
+ this.slotSharingGroups = new HashSet<>(slotSharingGroups);
+ this.coLocationGroups = new HashSet<>();
+ this.jobVerticesById =
+ jobVertices.stream()
+ .collect(Collectors.toMap(JobVertex::getID,
Function.identity()));
+ this.vertexParallelismStore = vertexParallelismStore;
+ }
+
+ @Override
+ public Collection<SlotSharingGroup> getSlotSharingGroups() {
+ return slotSharingGroups;
+ }
+
+ @Override
+ public Collection<CoLocationGroup> getCoLocationGroups() {
+ return coLocationGroups;
+ }
+
+ @Override
+ public JobInformation.VertexInformation getVertexInformation(JobVertexID
jobVertexId) {
+ return new JobGraphJobInformation.JobVertexInformation(
+ jobVerticesById.get(jobVertexId),
+ vertexParallelismStore.getParallelismInfo(jobVertexId));
+ }
+
+ @Override
+ public Iterable<JobInformation.VertexInformation> getVertices() {
+ return Iterables.transform(
+ jobVerticesById.values(), (vertex) ->
getVertexInformation(vertex.getID()));
+ }
+
+ @Override
+ public VertexParallelismStore getVertexParallelismStore() {
+ return vertexParallelismStore;
+ }
+}