This is an automated email from the ASF dual-hosted git repository.
weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 67f0cebeea4 [FLINK-38340][runtime] Introduce the rescale timeline
related abstraction for performing rescale operations (#27285)
67f0cebeea4 is described below
commit 67f0cebeea48e7b8c43ed02463c144edb8719eff
Author: Yuepeng Pan <[email protected]>
AuthorDate: Thu Feb 26 09:56:03 2026 +0800
[FLINK-38340][runtime] Introduce the rescale timeline related abstraction
for performing rescale operations (#27285)
---
.../adaptive/timeline/DefaultRescaleTimeline.java | 147 +++++++++++++++++++++
.../scheduler/adaptive/timeline/Rescale.java | 11 +-
.../adaptive/timeline/RescaleTimeline.java | 106 +++++++++++++++
.../adaptive/timeline/RescalesSummary.java | 136 +++++++++++++++++++
.../timeline/DefaultRescaleTimelineTest.java | 145 ++++++++++++++++++++
.../adaptive/timeline/RescalesSummaryTest.java | 89 +++++++++++++
6 files changed, 633 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
new file mode 100644
index 00000000000..1a23e229ea1
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+import org.apache.flink.runtime.util.BoundedFIFOQueue;
+import org.apache.flink.util.AbstractID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+/** Default implementation of {@link RescaleTimeline}. */
+public class DefaultRescaleTimeline implements RescaleTimeline {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultRescaleTimeline.class);
+
+ private final Supplier<JobInformation> jobInformationGetter;
+
+ private final BoundedFIFOQueue<Rescale> rescaleHistory;
+
+ private final Map<TerminalState, Rescale> latestRescales;
+
+ private final RescalesSummary rescalesSummary;
+
+ private RescaleIdInfo rescaleIdInfo;
+
+ /** When there are no rescales, the field would be null. */
+ @Nullable private Rescale currentRescale;
+
+ public DefaultRescaleTimeline(
+ Supplier<JobInformation> jobInformationGetter, int maxHistorySize)
{
+ this.jobInformationGetter = jobInformationGetter;
+ this.rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 0L);
+ this.latestRescales = new
ConcurrentHashMap<>(TerminalState.values().length);
+ this.rescaleHistory = new BoundedFIFOQueue<>(maxHistorySize);
+ this.rescalesSummary = new RescalesSummary(maxHistorySize);
+ }
+
+ @Nullable
+ @Override
+ public Rescale getLatestRescale(TerminalState terminalState) {
+ return latestRescales.get(terminalState);
+ }
+
+ @Nullable
+ @Override
+ public JobInformation getJobInformation() {
+ return jobInformationGetter.get();
+ }
+
+ @Override
+ public boolean isIdling() {
+ return currentRescale == null || currentRescale.isTerminated();
+ }
+
+ @Override
+ public RescaleIdInfo newRescale(boolean newRescaleEpoch) {
+ rollingLatestRescale();
+ if (!isIdling()) {
+ String hintMsg =
+ String.format("Rescale %s with unexpected terminal
state.", currentRescale);
+ LOG.warn(hintMsg);
+ throw new IllegalStateException(hintMsg);
+ }
+ currentRescale = new Rescale(nextRescaleId(newRescaleEpoch));
+ rescaleHistory.add(currentRescale);
+ rescalesSummary.addInProgress(currentRescale);
+ return currentRescale.getRescaleIdInfo();
+ }
+
+ @Override
+ public boolean updateRescale(RescaleUpdater rescaleUpdater) {
+ if (!isIdling() && Objects.nonNull(rescaleUpdater)) {
+ rescaleUpdater.update(currentRescale);
+ rollingLatestRescale();
+ if (Rescale.isTerminated(currentRescale)) {
+ rescalesSummary.addTerminated(currentRescale);
+ }
+ return true;
+ } else {
+ if (isIdling()) {
+ LOG.warn(
+ "Current rescale {} is null or terminated, so the
update action is ignored.",
+ currentRescale);
+ return false;
+ }
+ if (rescaleUpdater == null) {
+ LOG.warn(
+ "The rescale updater is null for {}, so the null
update action is ignored.",
+ currentRescale);
+ return false;
+ }
+ }
+ return false;
+ }
+
+ @Nullable
+ Rescale currentRescale() {
+ return currentRescale;
+ }
+
+ private RescaleIdInfo nextRescaleId(boolean newRescaleEpoch) {
+ if (newRescaleEpoch) {
+ rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 1L);
+ } else {
+ rescaleIdInfo =
+ new RescaleIdInfo(
+ rescaleIdInfo.getResourceRequirementsId(),
+ rescaleIdInfo.getRescaleAttemptId() + 1L);
+ }
+ return rescaleIdInfo;
+ }
+
+ /** Rolling the last rescale for the specified status. */
+ private void rollingLatestRescale() {
+ if (Rescale.isTerminated(currentRescale)) {
+ latestRescales.put(currentRescale.getTerminalState(),
currentRescale);
+ } else {
+ LOG.warn(
+ "Rescale {} is not terminated now, the rolling action is
ignored.",
+ currentRescale);
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
index 2fc3ef21b48..6bb4f3ae102 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
@@ -195,12 +195,21 @@ public class Rescale implements Serializable {
return terminalState;
}
+ @Nullable
+ public TerminatedReason getTerminatedReason() {
+ return terminatedReason;
+ }
+
+ public RescaleIdInfo getRescaleIdInfo() {
+ return rescaleIdInfo;
+ }
+
@Nullable
public String getStringifiedException() {
return stringifiedException;
}
- private boolean isTerminated() {
+ boolean isTerminated() {
return terminalState != null;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
new file mode 100644
index 00000000000..7c9f1411939
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+
+import javax.annotation.Nullable;
+
+/**
+ * The rescale timeline information updating interface. When rescale history
is enabled for a job,
+ * this class is used to perform fast operations on rescaling and to keep
historical records. That
+ * is, a timeline used to record changes in rescaling.
+ */
+@Internal
+public interface RescaleTimeline {
+
+ /** Get the job information of current job. */
+ @Nullable
+ JobInformation getJobInformation();
+
+ /**
+ * Judge whether the current rescale in timeline is terminated or null,
which represent no
+ * rescales in the current phase.
+ *
+ * @return <code>true</code> if there are no rescales to perform.
<code>false</code> else.
+ */
+ boolean isIdling();
+
+ /**
+ * Create a new rescale and assign it as current rescale. Note, the {@link
#isIdling()} must be
+ * true when creating a new current rescale. When there's a rescale in
transition phase, we must
+ * seal the current resale before creating a new rescale.
+ *
+ * @param newRescaleEpoch It represents whether the rescale resource
requirements is in the new
+ * epoch.
+ * @return the {@link RescaleIdInfo} of the new created {@link Rescale}.
+ */
+ RescaleIdInfo newRescale(boolean newRescaleEpoch);
+
+ /** Get the latest rescale for the specified terminal state. */
+ @Nullable
+ Rescale getLatestRescale(TerminalState terminalState);
+
+ /**
+ * Update the current rescale. It only makes sense to update a rescale
when there is an ongoing
+ * rescale that is in the process of transition states.
+ *
+ * @param rescaleUpdater The action to update the current rescale.
+ * @return <code>true</code> if update successfully <code>false</code>
else.
+ */
+ boolean updateRescale(RescaleUpdater rescaleUpdater);
+
+ /** Rescale operation interface. */
+ interface RescaleUpdater {
+ void update(Rescale rescaleToUpdate);
+ }
+
+ /** No-op implementation of {@link RescaleTimeline}. */
+ enum NoOpRescaleTimeline implements RescaleTimeline {
+ INSTANCE;
+
+ @Override
+ public RescaleIdInfo newRescale(boolean newRescaleEpoch) {
+ return null;
+ }
+
+ @Override
+ public boolean updateRescale(RescaleUpdater rescaleUpdater) {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public Rescale getLatestRescale(TerminalState terminalState) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public JobInformation getJobInformation() {
+ return null;
+ }
+
+ @Override
+ public boolean isIdling() {
+ return false;
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
new file mode 100644
index 00000000000..6d1e2a567cb
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.util.stats.StatsSummary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/** Statistics summary of rescales. */
+public class RescalesSummary implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RescalesSummary.class);
+
+ private final StatsSummary allTerminatedSummary;
+ private final StatsSummary completedRescalesSummary;
+ private final StatsSummary ignoredRescalesSummary;
+ private final StatsSummary failedRescalesSummary;
+
+ // Total terminated count
+ private long totalRescalesCount = 0L;
+ private long inProgressRescalesCount = 0L;
+
+ public RescalesSummary(int maxHistorySize) {
+ this.allTerminatedSummary = new StatsSummary(maxHistorySize);
+ this.completedRescalesSummary = new StatsSummary(maxHistorySize);
+ this.ignoredRescalesSummary = new StatsSummary(maxHistorySize);
+ this.failedRescalesSummary = new StatsSummary(maxHistorySize);
+ }
+
+ /**
+ * Add a terminated rescale in. Note, The method could be called after
calling {@link
+ * #addInProgress(Rescale)}.
+ *
+ * @param rescale the target terminated rescale.
+ */
+ public void addTerminated(Rescale rescale) {
+ if (!Rescale.isTerminated(rescale)) {
+ LOG.warn(
+ "Unexpected rescale: {}, which will be ignored when
computing statistics.",
+ rescale);
+ return;
+ }
+
+ this.allTerminatedSummary.add(rescale.getDuration().toMillis());
+ this.inProgressRescalesCount = 0;
+
+ if (rescale.getTerminalState() == null) {
+ return;
+ }
+
+ switch (rescale.getTerminalState()) {
+ case FAILED:
+ failedRescalesSummary.add(rescale.getDuration().toMillis());
+ break;
+ case COMPLETED:
+ completedRescalesSummary.add(rescale.getDuration().toMillis());
+ break;
+ case IGNORED:
+ ignoredRescalesSummary.add(rescale.getDuration().toMillis());
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Add an in-progress rescale in. Note, The method could be called before
{@link
+ * #addTerminated(Rescale)}.
+ *
+ * @param rescale the target non-terminated rescale.
+ */
+ public void addInProgress(Rescale rescale) {
+ if (Rescale.isTerminated(rescale)) {
+ LOG.warn("Unexpected rescale: {}, which will be ignored.",
rescale);
+ } else {
+ inProgressRescalesCount++;
+ totalRescalesCount++;
+ }
+ }
+
+ public long getTotalRescalesCount() {
+ return totalRescalesCount;
+ }
+
+ public long getInProgressRescalesCount() {
+ return inProgressRescalesCount;
+ }
+
+ public long getCompletedRescalesCount() {
+ return completedRescalesSummary.getCount();
+ }
+
+ public long getIgnoredRescalesCount() {
+ return ignoredRescalesSummary.getCount();
+ }
+
+ public long getFailedRescalesCount() {
+ return failedRescalesSummary.getCount();
+ }
+
+ public StatsSummary getAllTerminatedSummary() {
+ return allTerminatedSummary;
+ }
+
+ public StatsSummary getCompletedRescalesSummary() {
+ return completedRescalesSummary;
+ }
+
+ public StatsSummary getIgnoredRescalesSummary() {
+ return ignoredRescalesSummary;
+ }
+
+ public StatsSummary getFailedRescalesSummary() {
+ return failedRescalesSummary;
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimelineTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimelineTest.java
new file mode 100644
index 00000000000..1f0c140c705
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimelineTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultRescaleTimeline}. */
+class DefaultRescaleTimelineTest {
+
+ private RescaleTimeline rescaleTimeline;
+ private DefaultRescaleTimeline defaultRescaleTimeline;
+
+ @BeforeEach
+ void setUp() {
+ this.rescaleTimeline =
+ new DefaultRescaleTimeline(
+ () ->
+ new TestingJobInformation(
+ Collections.emptySet(),
+ Collections.emptyList(),
+ new DefaultVertexParallelismStore()),
+ 3);
+ this.defaultRescaleTimeline = (DefaultRescaleTimeline) rescaleTimeline;
+ }
+
+ @Test
+ void testIsIdling() {
+ assertThat(rescaleTimeline.isIdling()).isTrue();
+ rescaleTimeline.newRescale(true);
+ assertThat(rescaleTimeline.isIdling()).isFalse();
+
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate ->
rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ assertThat(rescaleTimeline.isIdling()).isTrue();
+ }
+
+ @Test
+ void testNewRescaleWithIdInfoGenerationLogic() {
+ assertThat(defaultRescaleTimeline.currentRescale()).isNull();
+ rescaleTimeline.newRescale(true);
+ Rescale rescale1 = defaultRescaleTimeline.currentRescale();
+ assertThat(rescale1).isNotNull();
+ RescaleIdInfo rescaleIdInfo1 = rescale1.getRescaleIdInfo();
+ assertThat(rescaleIdInfo1.getRescaleAttemptId()).isOne();
+
+ rescaleTimeline.updateRescale(r ->
r.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ rescaleTimeline.newRescale(false);
+ Rescale rescale2 = defaultRescaleTimeline.currentRescale();
+ RescaleIdInfo rescaleIdInfo2 = rescale2.getRescaleIdInfo();
+ assertThat(rescale2).isNotNull();
+ assertThat(rescaleIdInfo2.getRescaleAttemptId()).isEqualTo(2L);
+ assertThat(rescaleIdInfo2.getResourceRequirementsId())
+ .isEqualTo(rescaleIdInfo1.getResourceRequirementsId());
+
assertThat(rescaleIdInfo2.getRescaleUuid()).isNotEqualTo(rescaleIdInfo1.getRescaleUuid());
+
+ rescaleTimeline.updateRescale(r ->
r.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ rescaleTimeline.newRescale(true);
+ Rescale rescale3 = defaultRescaleTimeline.currentRescale();
+ RescaleIdInfo rescaleIdInfo3 = rescale3.getRescaleIdInfo();
+ assertThat(rescale3).isNotNull();
+ assertThat(rescaleIdInfo3.getRescaleAttemptId()).isOne();
+ assertThat(rescaleIdInfo3.getResourceRequirementsId())
+ .isNotEqualTo(rescaleIdInfo1.getResourceRequirementsId())
+ .isNotEqualTo(rescaleIdInfo2.getResourceRequirementsId());
+ assertThat(rescaleIdInfo3.getRescaleUuid())
+ .isNotEqualTo(rescaleIdInfo1.getRescaleUuid())
+ .isNotEqualTo(rescaleIdInfo2.getRescaleUuid());
+ }
+
+ @Test
+ void testGetLatestRescale() {
+
assertThat(rescaleTimeline.getLatestRescale(TerminalState.FAILED)).isNull();
+
assertThat(rescaleTimeline.getLatestRescale(TerminalState.COMPLETED)).isNull();
+
assertThat(rescaleTimeline.getLatestRescale(TerminalState.IGNORED)).isNull();
+
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate ->
rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ Rescale firstLatestCompletedRescale =
defaultRescaleTimeline.currentRescale();
+ assertThat(rescaleTimeline.getLatestRescale(TerminalState.COMPLETED))
+ .isNotNull()
+ .isEqualTo(defaultRescaleTimeline.currentRescale());
+
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate ->
+
rescaleToUpdate.setTerminatedReason(TerminatedReason.EXCEPTION_OCCURRED));
+ assertThat(rescaleTimeline.getLatestRescale(TerminalState.FAILED))
+ .isNotNull()
+ .isEqualTo(defaultRescaleTimeline.currentRescale());
+
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate ->
+
rescaleToUpdate.setTerminatedReason(TerminatedReason.JOB_FINISHED));
+ assertThat(rescaleTimeline.getLatestRescale(TerminalState.IGNORED))
+ .isNotNull()
+ .isEqualTo(defaultRescaleTimeline.currentRescale());
+
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate ->
rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ assertThat(rescaleTimeline.getLatestRescale(TerminalState.COMPLETED))
+ .isNotNull()
+ .isEqualTo(defaultRescaleTimeline.currentRescale())
+ .isNotEqualTo(firstLatestCompletedRescale);
+ }
+
+ @Test
+ void testUpdateRescale() {
+ assertThat(defaultRescaleTimeline.currentRescale()).isNull();
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescaleToUpdate ->
rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+ assertThat(defaultRescaleTimeline.currentRescale().getTerminalState())
+ .isNotNull()
+ .isEqualTo(TerminalState.COMPLETED);
+
assertThat(defaultRescaleTimeline.currentRescale().getTerminatedReason())
+ .isNotNull()
+ .isEqualTo(TerminatedReason.SUCCEEDED);
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
new file mode 100644
index 00000000000..4e717aee344
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.util.stats.StatsSummary;
+import org.apache.flink.util.AbstractID;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RescalesSummary}. */
+class RescalesSummaryTest {
+
+ private Rescale getRescale() {
+ Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale.setStartTimestamp(1L);
+ rescale.setEndTimestamp(2L);
+ return rescale;
+ }
+
+ private void assertSummary(
+ StatsSummary summary,
+ long expectedCount,
+ long expectedSum,
+ long expectedAvg,
+ long expectedMax,
+ long expectedMin) {
+ assertThat(summary.getMinimum()).isEqualTo(expectedMin);
+ assertThat(summary.getMaximum()).isEqualTo(expectedMax);
+ assertThat(summary.getAverage()).isEqualTo(expectedAvg);
+ assertThat(summary.getSum()).isEqualTo(expectedSum);
+ assertThat(summary.getCount()).isEqualTo(expectedCount);
+ }
+
+ @Test
+ void testAddInProgressAndTerminated() {
+ RescalesSummary rescalesSummary = new RescalesSummary(5);
+ Rescale rescale = getRescale();
+ rescale.setStartTimestamp(1L);
+ rescale.setEndTimestamp(2L);
+
+ // Test adding unexpected non-terminated rescale.
+ rescalesSummary.addTerminated(rescale);
+ assertThat(rescalesSummary.getTotalRescalesCount()).isZero();
+ assertSummary(rescalesSummary.getAllTerminatedSummary(), 0L, 0L, 0L,
0L, 0L);
+
+ // Test adding unexpected terminated rescale.
+ rescale.setTerminatedReason(TerminatedReason.SUCCEEDED);
+ rescalesSummary.addInProgress(rescale);
+ assertThat(rescalesSummary.getTotalRescalesCount()).isZero();
+ assertSummary(rescalesSummary.getAllTerminatedSummary(), 0L, 0L, 0L,
0L, 0L);
+
+ // Test add in-progress rescale.
+ rescale = getRescale();
+ rescalesSummary.addInProgress(rescale);
+ assertThat(rescalesSummary.getTotalRescalesCount()).isOne();
+ assertThat(rescalesSummary.getCompletedRescalesCount()).isZero();
+ assertThat(rescalesSummary.getIgnoredRescalesCount()).isZero();
+ assertThat(rescalesSummary.getFailedRescalesCount()).isZero();
+ assertThat(rescalesSummary.getInProgressRescalesCount()).isOne();
+
+ // Test add a completed rescale after adding a in-progress rescale.
+ rescale.setTerminatedReason(TerminatedReason.SUCCEEDED);
+ rescalesSummary.addTerminated(rescale);
+ assertThat(rescalesSummary.getTotalRescalesCount()).isOne();
+ assertThat(rescalesSummary.getCompletedRescalesCount()).isOne();
+ assertThat(rescalesSummary.getIgnoredRescalesCount()).isZero();
+ assertThat(rescalesSummary.getInProgressRescalesCount()).isZero();
+ assertThat(rescalesSummary.getFailedRescalesCount()).isZero();
+ assertSummary(rescalesSummary.getCompletedRescalesSummary(), 1L, 1L,
1L, 1L, 1L);
+ }
+}