This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 67f0cebeea4 [FLINK-38340][runtime] Introduce the rescale timeline 
related abstraction for performing rescale operations (#27285)
67f0cebeea4 is described below

commit 67f0cebeea48e7b8c43ed02463c144edb8719eff
Author: Yuepeng Pan <[email protected]>
AuthorDate: Thu Feb 26 09:56:03 2026 +0800

    [FLINK-38340][runtime] Introduce the rescale timeline related abstraction 
for performing rescale operations (#27285)
---
 .../adaptive/timeline/DefaultRescaleTimeline.java  | 147 +++++++++++++++++++++
 .../scheduler/adaptive/timeline/Rescale.java       |  11 +-
 .../adaptive/timeline/RescaleTimeline.java         | 106 +++++++++++++++
 .../adaptive/timeline/RescalesSummary.java         | 136 +++++++++++++++++++
 .../timeline/DefaultRescaleTimelineTest.java       | 145 ++++++++++++++++++++
 .../adaptive/timeline/RescalesSummaryTest.java     |  89 +++++++++++++
 6 files changed, 633 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
new file mode 100644
index 00000000000..1a23e229ea1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+import org.apache.flink.runtime.util.BoundedFIFOQueue;
+import org.apache.flink.util.AbstractID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+/** Default implementation of {@link RescaleTimeline}. */
+public class DefaultRescaleTimeline implements RescaleTimeline {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultRescaleTimeline.class);
+
+    private final Supplier<JobInformation> jobInformationGetter;
+
+    private final BoundedFIFOQueue<Rescale> rescaleHistory;
+
+    private final Map<TerminalState, Rescale> latestRescales;
+
+    private final RescalesSummary rescalesSummary;
+
+    private RescaleIdInfo rescaleIdInfo;
+
+    /** When there are no rescales, the field would be null. */
+    @Nullable private Rescale currentRescale;
+
+    public DefaultRescaleTimeline(
+            Supplier<JobInformation> jobInformationGetter, int maxHistorySize) 
{
+        this.jobInformationGetter = jobInformationGetter;
+        this.rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 0L);
+        this.latestRescales = new 
ConcurrentHashMap<>(TerminalState.values().length);
+        this.rescaleHistory = new BoundedFIFOQueue<>(maxHistorySize);
+        this.rescalesSummary = new RescalesSummary(maxHistorySize);
+    }
+
+    @Nullable
+    @Override
+    public Rescale getLatestRescale(TerminalState terminalState) {
+        return latestRescales.get(terminalState);
+    }
+
+    @Nullable
+    @Override
+    public JobInformation getJobInformation() {
+        return jobInformationGetter.get();
+    }
+
+    @Override
+    public boolean isIdling() {
+        return currentRescale == null || currentRescale.isTerminated();
+    }
+
+    @Override
+    public RescaleIdInfo newRescale(boolean newRescaleEpoch) {
+        rollingLatestRescale();
+        if (!isIdling()) {
+            String hintMsg =
+                    String.format("Rescale %s with unexpected terminal 
state.", currentRescale);
+            LOG.warn(hintMsg);
+            throw new IllegalStateException(hintMsg);
+        }
+        currentRescale = new Rescale(nextRescaleId(newRescaleEpoch));
+        rescaleHistory.add(currentRescale);
+        rescalesSummary.addInProgress(currentRescale);
+        return currentRescale.getRescaleIdInfo();
+    }
+
+    @Override
+    public boolean updateRescale(RescaleUpdater rescaleUpdater) {
+        if (!isIdling() && Objects.nonNull(rescaleUpdater)) {
+            rescaleUpdater.update(currentRescale);
+            rollingLatestRescale();
+            if (Rescale.isTerminated(currentRescale)) {
+                rescalesSummary.addTerminated(currentRescale);
+            }
+            return true;
+        } else {
+            if (isIdling()) {
+                LOG.warn(
+                        "Current rescale {} is null or terminated, so the 
update action is ignored.",
+                        currentRescale);
+                return false;
+            }
+            if (rescaleUpdater == null) {
+                LOG.warn(
+                        "The rescale updater is null for {}, so the null 
update action is ignored.",
+                        currentRescale);
+                return false;
+            }
+        }
+        return false;
+    }
+
+    @Nullable
+    Rescale currentRescale() {
+        return currentRescale;
+    }
+
+    private RescaleIdInfo nextRescaleId(boolean newRescaleEpoch) {
+        if (newRescaleEpoch) {
+            rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 1L);
+        } else {
+            rescaleIdInfo =
+                    new RescaleIdInfo(
+                            rescaleIdInfo.getResourceRequirementsId(),
+                            rescaleIdInfo.getRescaleAttemptId() + 1L);
+        }
+        return rescaleIdInfo;
+    }
+
+    /** Rolling the last rescale for the specified status. */
+    private void rollingLatestRescale() {
+        if (Rescale.isTerminated(currentRescale)) {
+            latestRescales.put(currentRescale.getTerminalState(), 
currentRescale);
+        } else {
+            LOG.warn(
+                    "Rescale {} is not terminated now, the rolling action is 
ignored.",
+                    currentRescale);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
index 2fc3ef21b48..6bb4f3ae102 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
@@ -195,12 +195,21 @@ public class Rescale implements Serializable {
         return terminalState;
     }
 
+    @Nullable
+    public TerminatedReason getTerminatedReason() {
+        return terminatedReason;
+    }
+
+    public RescaleIdInfo getRescaleIdInfo() {
+        return rescaleIdInfo;
+    }
+
     @Nullable
     public String getStringifiedException() {
         return stringifiedException;
     }
 
-    private boolean isTerminated() {
+    boolean isTerminated() {
         return terminalState != null;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
new file mode 100644
index 00000000000..7c9f1411939
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+
+import javax.annotation.Nullable;
+
+/**
+ * The rescale timeline information updating interface. When rescale history 
is enabled for a job,
+ * this class is used to perform fast operations on rescaling and to keep 
historical records. That
+ * is, a timeline used to record changes in rescaling.
+ */
+@Internal
+public interface RescaleTimeline {
+
+    /** Get the job information of current job. */
+    @Nullable
+    JobInformation getJobInformation();
+
+    /**
+     * Judge whether the current rescale in timeline is terminated or null, 
which represent no
+     * rescales in the current phase.
+     *
+     * @return <code>true</code> if there are no rescales to perform. 
<code>false</code> else.
+     */
+    boolean isIdling();
+
+    /**
+     * Create a new rescale and assign it as current rescale. Note, the {@link 
#isIdling()} must be
+     * true when creating a new current rescale. When there's a rescale in 
transition phase, we must
+     * seal the current resale before creating a new rescale.
+     *
+     * @param newRescaleEpoch It represents whether the rescale resource 
requirements is in the new
+     *     epoch.
+     * @return the {@link RescaleIdInfo} of the new created {@link Rescale}.
+     */
+    RescaleIdInfo newRescale(boolean newRescaleEpoch);
+
+    /** Get the latest rescale for the specified terminal state. */
+    @Nullable
+    Rescale getLatestRescale(TerminalState terminalState);
+
+    /**
+     * Update the current rescale. It only makes sense to update a rescale 
when there is an ongoing
+     * rescale that is in the process of transition states.
+     *
+     * @param rescaleUpdater The action to update the current rescale.
+     * @return <code>true</code> if update successfully <code>false</code> 
else.
+     */
+    boolean updateRescale(RescaleUpdater rescaleUpdater);
+
+    /** Rescale operation interface. */
+    interface RescaleUpdater {
+        void update(Rescale rescaleToUpdate);
+    }
+
+    /** No-op implementation of {@link RescaleTimeline}. */
+    enum NoOpRescaleTimeline implements RescaleTimeline {
+        INSTANCE;
+
+        @Override
+        public RescaleIdInfo newRescale(boolean newRescaleEpoch) {
+            return null;
+        }
+
+        @Override
+        public boolean updateRescale(RescaleUpdater rescaleUpdater) {
+            return false;
+        }
+
+        @Nullable
+        @Override
+        public Rescale getLatestRescale(TerminalState terminalState) {
+            return null;
+        }
+
+        @Nullable
+        @Override
+        public JobInformation getJobInformation() {
+            return null;
+        }
+
+        @Override
+        public boolean isIdling() {
+            return false;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
new file mode 100644
index 00000000000..6d1e2a567cb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.util.stats.StatsSummary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/** Statistics summary of rescales. */
+public class RescalesSummary implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RescalesSummary.class);
+
+    private final StatsSummary allTerminatedSummary;
+    private final StatsSummary completedRescalesSummary;
+    private final StatsSummary ignoredRescalesSummary;
+    private final StatsSummary failedRescalesSummary;
+
+    // Total terminated count
+    private long totalRescalesCount = 0L;
+    private long inProgressRescalesCount = 0L;
+
+    public RescalesSummary(int maxHistorySize) {
+        this.allTerminatedSummary = new StatsSummary(maxHistorySize);
+        this.completedRescalesSummary = new StatsSummary(maxHistorySize);
+        this.ignoredRescalesSummary = new StatsSummary(maxHistorySize);
+        this.failedRescalesSummary = new StatsSummary(maxHistorySize);
+    }
+
+    /**
+     * Add a terminated rescale in. Note, The method could be called after 
calling {@link
+     * #addInProgress(Rescale)}.
+     *
+     * @param rescale the target terminated rescale.
+     */
+    public void addTerminated(Rescale rescale) {
+        if (!Rescale.isTerminated(rescale)) {
+            LOG.warn(
+                    "Unexpected rescale: {}, which will be ignored when 
computing statistics.",
+                    rescale);
+            return;
+        }
+
+        this.allTerminatedSummary.add(rescale.getDuration().toMillis());
+        this.inProgressRescalesCount = 0;
+
+        if (rescale.getTerminalState() == null) {
+            return;
+        }
+
+        switch (rescale.getTerminalState()) {
+            case FAILED:
+                failedRescalesSummary.add(rescale.getDuration().toMillis());
+                break;
+            case COMPLETED:
+                completedRescalesSummary.add(rescale.getDuration().toMillis());
+                break;
+            case IGNORED:
+                ignoredRescalesSummary.add(rescale.getDuration().toMillis());
+                break;
+            default:
+                break;
+        }
+    }
+
+    /**
+     * Add an in-progress rescale in. Note, The method could be called before 
{@link
+     * #addTerminated(Rescale)}.
+     *
+     * @param rescale the target non-terminated rescale.
+     */
+    public void addInProgress(Rescale rescale) {
+        if (Rescale.isTerminated(rescale)) {
+            LOG.warn("Unexpected rescale: {}, which will be ignored.", 
rescale);
+        } else {
+            inProgressRescalesCount++;
+            totalRescalesCount++;
+        }
+    }
+
+    public long getTotalRescalesCount() {
+        return totalRescalesCount;
+    }
+
+    public long getInProgressRescalesCount() {
+        return inProgressRescalesCount;
+    }
+
+    public long getCompletedRescalesCount() {
+        return completedRescalesSummary.getCount();
+    }
+
+    public long getIgnoredRescalesCount() {
+        return ignoredRescalesSummary.getCount();
+    }
+
+    public long getFailedRescalesCount() {
+        return failedRescalesSummary.getCount();
+    }
+
+    public StatsSummary getAllTerminatedSummary() {
+        return allTerminatedSummary;
+    }
+
+    public StatsSummary getCompletedRescalesSummary() {
+        return completedRescalesSummary;
+    }
+
+    public StatsSummary getIgnoredRescalesSummary() {
+        return ignoredRescalesSummary;
+    }
+
+    public StatsSummary getFailedRescalesSummary() {
+        return failedRescalesSummary;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimelineTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimelineTest.java
new file mode 100644
index 00000000000..1f0c140c705
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimelineTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultRescaleTimeline}. */
+class DefaultRescaleTimelineTest {
+
+    private RescaleTimeline rescaleTimeline;
+    private DefaultRescaleTimeline defaultRescaleTimeline;
+
+    @BeforeEach
+    void setUp() {
+        this.rescaleTimeline =
+                new DefaultRescaleTimeline(
+                        () ->
+                                new TestingJobInformation(
+                                        Collections.emptySet(),
+                                        Collections.emptyList(),
+                                        new DefaultVertexParallelismStore()),
+                        3);
+        this.defaultRescaleTimeline = (DefaultRescaleTimeline) rescaleTimeline;
+    }
+
+    @Test
+    void testIsIdling() {
+        assertThat(rescaleTimeline.isIdling()).isTrue();
+        rescaleTimeline.newRescale(true);
+        assertThat(rescaleTimeline.isIdling()).isFalse();
+
+        rescaleTimeline.updateRescale(
+                rescaleToUpdate -> 
rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+        assertThat(rescaleTimeline.isIdling()).isTrue();
+    }
+
+    @Test
+    void testNewRescaleWithIdInfoGenerationLogic() {
+        assertThat(defaultRescaleTimeline.currentRescale()).isNull();
+        rescaleTimeline.newRescale(true);
+        Rescale rescale1 = defaultRescaleTimeline.currentRescale();
+        assertThat(rescale1).isNotNull();
+        RescaleIdInfo rescaleIdInfo1 = rescale1.getRescaleIdInfo();
+        assertThat(rescaleIdInfo1.getRescaleAttemptId()).isOne();
+
+        rescaleTimeline.updateRescale(r -> 
r.setTerminatedReason(TerminatedReason.SUCCEEDED));
+        rescaleTimeline.newRescale(false);
+        Rescale rescale2 = defaultRescaleTimeline.currentRescale();
+        RescaleIdInfo rescaleIdInfo2 = rescale2.getRescaleIdInfo();
+        assertThat(rescale2).isNotNull();
+        assertThat(rescaleIdInfo2.getRescaleAttemptId()).isEqualTo(2L);
+        assertThat(rescaleIdInfo2.getResourceRequirementsId())
+                .isEqualTo(rescaleIdInfo1.getResourceRequirementsId());
+        
assertThat(rescaleIdInfo2.getRescaleUuid()).isNotEqualTo(rescaleIdInfo1.getRescaleUuid());
+
+        rescaleTimeline.updateRescale(r -> 
r.setTerminatedReason(TerminatedReason.SUCCEEDED));
+        rescaleTimeline.newRescale(true);
+        Rescale rescale3 = defaultRescaleTimeline.currentRescale();
+        RescaleIdInfo rescaleIdInfo3 = rescale3.getRescaleIdInfo();
+        assertThat(rescale3).isNotNull();
+        assertThat(rescaleIdInfo3.getRescaleAttemptId()).isOne();
+        assertThat(rescaleIdInfo3.getResourceRequirementsId())
+                .isNotEqualTo(rescaleIdInfo1.getResourceRequirementsId())
+                .isNotEqualTo(rescaleIdInfo2.getResourceRequirementsId());
+        assertThat(rescaleIdInfo3.getRescaleUuid())
+                .isNotEqualTo(rescaleIdInfo1.getRescaleUuid())
+                .isNotEqualTo(rescaleIdInfo2.getRescaleUuid());
+    }
+
+    @Test
+    void testGetLatestRescale() {
+        
assertThat(rescaleTimeline.getLatestRescale(TerminalState.FAILED)).isNull();
+        
assertThat(rescaleTimeline.getLatestRescale(TerminalState.COMPLETED)).isNull();
+        
assertThat(rescaleTimeline.getLatestRescale(TerminalState.IGNORED)).isNull();
+
+        rescaleTimeline.newRescale(true);
+        rescaleTimeline.updateRescale(
+                rescaleToUpdate -> 
rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+        Rescale firstLatestCompletedRescale = 
defaultRescaleTimeline.currentRescale();
+        assertThat(rescaleTimeline.getLatestRescale(TerminalState.COMPLETED))
+                .isNotNull()
+                .isEqualTo(defaultRescaleTimeline.currentRescale());
+
+        rescaleTimeline.newRescale(true);
+        rescaleTimeline.updateRescale(
+                rescaleToUpdate ->
+                        
rescaleToUpdate.setTerminatedReason(TerminatedReason.EXCEPTION_OCCURRED));
+        assertThat(rescaleTimeline.getLatestRescale(TerminalState.FAILED))
+                .isNotNull()
+                .isEqualTo(defaultRescaleTimeline.currentRescale());
+
+        rescaleTimeline.newRescale(true);
+        rescaleTimeline.updateRescale(
+                rescaleToUpdate ->
+                        
rescaleToUpdate.setTerminatedReason(TerminatedReason.JOB_FINISHED));
+        assertThat(rescaleTimeline.getLatestRescale(TerminalState.IGNORED))
+                .isNotNull()
+                .isEqualTo(defaultRescaleTimeline.currentRescale());
+
+        rescaleTimeline.newRescale(true);
+        rescaleTimeline.updateRescale(
+                rescaleToUpdate -> 
rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+        assertThat(rescaleTimeline.getLatestRescale(TerminalState.COMPLETED))
+                .isNotNull()
+                .isEqualTo(defaultRescaleTimeline.currentRescale())
+                .isNotEqualTo(firstLatestCompletedRescale);
+    }
+
+    @Test
+    void testUpdateRescale() {
+        assertThat(defaultRescaleTimeline.currentRescale()).isNull();
+        rescaleTimeline.newRescale(true);
+        rescaleTimeline.updateRescale(
+                rescaleToUpdate -> 
rescaleToUpdate.setTerminatedReason(TerminatedReason.SUCCEEDED));
+        assertThat(defaultRescaleTimeline.currentRescale().getTerminalState())
+                .isNotNull()
+                .isEqualTo(TerminalState.COMPLETED);
+        
assertThat(defaultRescaleTimeline.currentRescale().getTerminatedReason())
+                .isNotNull()
+                .isEqualTo(TerminatedReason.SUCCEEDED);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
new file mode 100644
index 00000000000..4e717aee344
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.util.stats.StatsSummary;
+import org.apache.flink.util.AbstractID;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RescalesSummary}. */
+class RescalesSummaryTest {
+
+    private Rescale getRescale() {
+        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale.setStartTimestamp(1L);
+        rescale.setEndTimestamp(2L);
+        return rescale;
+    }
+
+    private void assertSummary(
+            StatsSummary summary,
+            long expectedCount,
+            long expectedSum,
+            long expectedAvg,
+            long expectedMax,
+            long expectedMin) {
+        assertThat(summary.getMinimum()).isEqualTo(expectedMin);
+        assertThat(summary.getMaximum()).isEqualTo(expectedMax);
+        assertThat(summary.getAverage()).isEqualTo(expectedAvg);
+        assertThat(summary.getSum()).isEqualTo(expectedSum);
+        assertThat(summary.getCount()).isEqualTo(expectedCount);
+    }
+
+    @Test
+    void testAddInProgressAndTerminated() {
+        RescalesSummary rescalesSummary = new RescalesSummary(5);
+        Rescale rescale = getRescale();
+        rescale.setStartTimestamp(1L);
+        rescale.setEndTimestamp(2L);
+
+        // Test adding unexpected non-terminated rescale.
+        rescalesSummary.addTerminated(rescale);
+        assertThat(rescalesSummary.getTotalRescalesCount()).isZero();
+        assertSummary(rescalesSummary.getAllTerminatedSummary(), 0L, 0L, 0L, 
0L, 0L);
+
+        // Test adding unexpected terminated rescale.
+        rescale.setTerminatedReason(TerminatedReason.SUCCEEDED);
+        rescalesSummary.addInProgress(rescale);
+        assertThat(rescalesSummary.getTotalRescalesCount()).isZero();
+        assertSummary(rescalesSummary.getAllTerminatedSummary(), 0L, 0L, 0L, 
0L, 0L);
+
+        // Test add in-progress rescale.
+        rescale = getRescale();
+        rescalesSummary.addInProgress(rescale);
+        assertThat(rescalesSummary.getTotalRescalesCount()).isOne();
+        assertThat(rescalesSummary.getCompletedRescalesCount()).isZero();
+        assertThat(rescalesSummary.getIgnoredRescalesCount()).isZero();
+        assertThat(rescalesSummary.getFailedRescalesCount()).isZero();
+        assertThat(rescalesSummary.getInProgressRescalesCount()).isOne();
+
+        // Test add a completed rescale after adding a in-progress rescale.
+        rescale.setTerminatedReason(TerminatedReason.SUCCEEDED);
+        rescalesSummary.addTerminated(rescale);
+        assertThat(rescalesSummary.getTotalRescalesCount()).isOne();
+        assertThat(rescalesSummary.getCompletedRescalesCount()).isOne();
+        assertThat(rescalesSummary.getIgnoredRescalesCount()).isZero();
+        assertThat(rescalesSummary.getInProgressRescalesCount()).isZero();
+        assertThat(rescalesSummary.getFailedRescalesCount()).isZero();
+        assertSummary(rescalesSummary.getCompletedRescalesSummary(), 1L, 1L, 
1L, 1L, 1L);
+    }
+}

Reply via email to