This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 42e25939e4a [FLINK-36871][runtime] add rescale metrics in scheduler (#25770) 42e25939e4a is described below commit 42e25939e4ae4e2aa70c122e268cc4cd5dd6eb41 Author: Peter Huang <huangzhenqiu0...@gmail.com> AuthorDate: Thu Jan 9 02:23:08 2025 -0800 [FLINK-36871][runtime] add rescale metrics in scheduler (#25770) --- docs/content.zh/docs/ops/metrics.md | 7 +- docs/content/docs/ops/metrics.md | 7 +- .../apache/flink/runtime/metrics/MetricNames.java | 1 + .../flink/runtime/scheduler/DefaultScheduler.java | 6 ++ .../flink/runtime/scheduler/SchedulerBase.java | 5 + .../scheduler/adaptive/AdaptiveScheduler.java | 7 ++ .../scheduler/adaptive/AdaptiveSchedulerTest.java | 76 ++++++++++++++- .../scheduler/adaptive/MockRestartingContext.java | 104 +++++++++++++++++++++ .../runtime/scheduler/adaptive/RestartingTest.java | 76 --------------- 9 files changed, 209 insertions(+), 80 deletions(-) diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md index cd9369d0ff3..df901d3bddb 100644 --- a/docs/content.zh/docs/ops/metrics.md +++ b/docs/content.zh/docs/ops/metrics.md @@ -1271,7 +1271,12 @@ Whether these metrics are reported depends on the [metrics.job.status.enable]({{ </tr> <tr> <td>numRestarts</td> - <td>The total number of restarts since this job was submitted, including full restarts and fine-grained restarts.</td> + <td>The total number of restarts since this job was submitted, including full restarts, fine-grained restarts and restarts triggered by rescaling.</td> + <td>Gauge</td> + </tr> + <tr> + <td>numRescales</td> + <td>The total number of restarts triggered by rescaling, including scale up and scale down.</td> <td>Gauge</td> </tr> </tbody> diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md index c316bf15968..74f412832b9 100644 --- a/docs/content/docs/ops/metrics.md +++ b/docs/content/docs/ops/metrics.md @@ -1262,7 +1262,12 @@ Whether these metrics are reported depends on the [metrics.job.status.enable]({{ </tr> <tr> <td>numRestarts</td> - <td>The total number of restarts since this job was submitted, including full restarts and fine-grained restarts.</td> + <td>The total number of restarts since this job was submitted, including full restarts, fine-grained restarts and restarts triggered by rescaling.</td> + <td>Gauge</td> + </tr> + <tr> + <td>numRescales</td> + <td>The total number of restarts triggered by rescaling, including scale up and scale down.</td> <td>Gauge</td> </tr> </tbody> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index a0eca1fde2a..96e826846ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -52,6 +52,7 @@ public class MetricNames { public static final String NUM_PENDING_TASK_MANAGERS = "numPendingTaskManagers"; public static final String NUM_RESTARTS = "numRestarts"; + public static final String NUM_RESCALES = "numRescales"; public static final String MEMORY_USED = "Used"; public static final String MEMORY_COMMITTED = "Committed"; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 0bbddd3317d..38bdaed8c2b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -226,6 +226,12 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio return executionFailureHandler.getNumberOfRestarts(); } + @Override + protected long getNumberOfRescales() { + // It is always 0 for DefaultScheduler. + return 0; + } + @Override protected void cancelAllPendingSlotRequestsInternal() { getSchedulingTopology() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index dc2db52b405..80aab984bfd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -604,6 +604,8 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling protected abstract long getNumberOfRestarts(); + protected abstract long getNumberOfRescales(); + protected MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy() { // blocking partition always need mark finished. return ResultPartitionType::isBlockingOrBlockingPersistentResultPartition; @@ -650,6 +652,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling jobManagerJobMetricGroup, executionGraph, this::getNumberOfRestarts, + this::getNumberOfRescales, deploymentStateTimeMetrics, executionGraph::registerJobStatusListener, executionGraph.getStatusTimestamp(JobStatus.INITIALIZING), @@ -662,6 +665,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling MetricGroup metrics, JobStatusProvider jobStatusProvider, Gauge<Long> numberOfRestarts, + Gauge<Long> numberOfRescales, DeploymentStateTimeMetrics deploymentTimeMetrics, Consumer<JobStatusListener> jobStatusListenerRegistrar, long initializationTimestamp, @@ -669,6 +673,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(jobStatusProvider)); metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(jobStatusProvider)); metrics.gauge(MetricNames.NUM_RESTARTS, numberOfRestarts::getValue); + metrics.gauge(MetricNames.NUM_RESCALES, numberOfRescales::getValue); final JobStatusMetrics jobStatusMetrics = new JobStatusMetrics(initializationTimestamp, jobStatusMetricsSettings); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 4e99580545b..31505241fc1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -404,6 +404,8 @@ public class AdaptiveScheduler private int numRestarts = 0; + private int numRescales = 0; + private final MutableVertexAttemptNumberStore vertexAttemptNumberStore = new DefaultVertexAttemptNumberStore(); @@ -557,6 +559,7 @@ public class AdaptiveScheduler jobManagerJobMetricGroup, jobStatusStore, () -> (long) numRestarts, + () -> (long) numRescales, deploymentTimeMetrics, tmpJobStatusListeners::add, initializationTimestamp, @@ -1267,7 +1270,11 @@ public class AdaptiveScheduler forcedRestart, userCodeClassLoader, failureCollection)); + numRestarts++; + if (failureCollection.isEmpty()) { + numRescales++; + } } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 3a223cb4b8c..c3a77d26fcd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -92,6 +92,8 @@ import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.apache.flink.runtime.operators.coordination.TestOperatorEvent; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.scheduler.DefaultSchedulerTest; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; @@ -592,13 +594,16 @@ public class AdaptiveSchedulerTest { } @Test - void testNumRestartsMetric() throws Exception { + void testNumRescalesAndStartsMetricForRescale() throws Exception { + final CompletableFuture<Gauge<Long>> numRescalesMetricFuture = new CompletableFuture<>(); final CompletableFuture<Gauge<Long>> numRestartsMetricFuture = new CompletableFuture<>(); final MetricRegistry metricRegistry = TestingMetricRegistry.builder() .setRegisterConsumer( (metric, name, group) -> { - if (MetricNames.NUM_RESTARTS.equals(name)) { + if (MetricNames.NUM_RESCALES.equals(name)) { + numRescalesMetricFuture.complete((Gauge<Long>) metric); + } else if (MetricNames.NUM_RESTARTS.equals(name)) { numRestartsMetricFuture.complete((Gauge<Long>) metric); } }) @@ -629,6 +634,7 @@ public class AdaptiveSchedulerTest { .setDeclarativeSlotPool(declarativeSlotPool) .build(); + final Gauge<Long> numRescalesMetric = numRescalesMetricFuture.get(); final Gauge<Long> numRestartsMetric = numRestartsMetricFuture.get(); final SubmissionBufferingTaskManagerGateway taskManagerGateway = @@ -649,6 +655,7 @@ public class AdaptiveSchedulerTest { // wait for the first task submission taskManagerGateway.waitForSubmissions(1); + assertThat(numRescalesMetric.getValue()).isEqualTo(0L); assertThat(numRestartsMetric.getValue()).isEqualTo(0L); // offer more slots, which will cause a restart in order to scale up @@ -664,9 +671,74 @@ public class AdaptiveSchedulerTest { // wait for the second task submissions taskManagerGateway.waitForSubmissions(PARALLELISM); + assertThat(numRescalesMetric.getValue()).isEqualTo(1L); assertThat(numRestartsMetric.getValue()).isEqualTo(1L); } + @Test + void testNumRescalesAndStartsMetricForFailureRecovery() throws Exception { + final CompletableFuture<Gauge<Long>> numRescalesMetricFuture = new CompletableFuture<>(); + final CompletableFuture<Gauge<Long>> numRestartsMetricFuture = new CompletableFuture<>(); + final MetricRegistry metricRegistry = + TestingMetricRegistry.builder() + .setRegisterConsumer( + (metric, name, group) -> { + if (MetricNames.NUM_RESCALES.equals(name)) { + numRescalesMetricFuture.complete((Gauge<Long>) metric); + } else if (MetricNames.NUM_RESTARTS.equals(name)) { + numRestartsMetricFuture.complete((Gauge<Long>) metric); + } + }) + .build(); + + scheduler = + new AdaptiveSchedulerBuilder( + createJobGraph(), + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .setJobMasterConfiguration(createConfigurationWithNoTimeouts()) + .setJobManagerJobMetricGroup( + JobManagerMetricGroup.createJobManagerMetricGroup( + metricRegistry, "localhost") + .addJob(new JobID(), "jobName")) + .build(); + + try (MockRestartingContext ctx = new MockRestartingContext()) { + StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph(); + + final ExecutionGraphHandler executionGraphHandler = + new ExecutionGraphHandler( + executionGraph, + LOG, + ctx.getMainThreadExecutor(), + ctx.getMainThreadExecutor()); + final OperatorCoordinatorHandler operatorCoordinatorHandler = + new TestingOperatorCoordinatorHandler(); + executionGraph.transitionToRunning(); + + List<ExceptionHistoryEntry> failureCollection = + List.of( + ExceptionHistoryEntry.createGlobal( + new Exception("test"), + CompletableFuture.completedFuture(Collections.emptyMap()))); + runInMainThread( + () -> + scheduler.goToRestarting( + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + Duration.ZERO, + true, + failureCollection)); + } + + final Gauge<Long> numRestartsMetric = numRestartsMetricFuture.get(); + assertThat(numRestartsMetric.getValue()).isEqualTo(1L); + + final Gauge<Long> numScalesMetric = numRescalesMetricFuture.get(); + assertThat(numScalesMetric.getValue()).isEqualTo(0L); + } + @Test void testStatusMetrics() throws Exception { final CompletableFuture<UpTimeGauge> upTimeMetricFuture = new CompletableFuture<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java new file mode 100644 index 00000000000..a507c9d0b63 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.core.testutils.CompletedScheduledFuture; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; +import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Consumer; + +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; + +/** Mock the {@link StateWithExecutionGraph.Context} for restarting state. */ +class MockRestartingContext extends MockStateWithExecutionGraphContext + implements Restarting.Context { + + private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator = + new StateValidator<>("Cancelling"); + + private final StateValidator<ExecutionGraph> waitingForResourcesStateValidator = + new StateValidator<>("WaitingForResources"); + + private final StateValidator<ExecutionGraph> creatingExecutionGraphStateValidator = + new StateValidator<>("CreatingExecutionGraph"); + + public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) { + cancellingStateValidator.expectInput(asserter); + } + + public void setExpectWaitingForResources() { + waitingForResourcesStateValidator.expectInput(assertNonNull()); + } + + public void setExpectCreatingExecutionGraph() { + creatingExecutionGraphStateValidator.expectInput(assertNonNull()); + } + + @Override + public void goToCanceling( + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + List<ExceptionHistoryEntry> failureCollection) { + cancellingStateValidator.validateInput( + new ExecutingTest.CancellingArguments( + executionGraph, executionGraphHandler, operatorCoordinatorHandler)); + hadStateTransition = true; + } + + @Override + public void archiveFailure(RootExceptionHistoryEntry failure) {} + + @Override + public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { + waitingForResourcesStateValidator.validateInput(previousExecutionGraph); + hadStateTransition = true; + } + + @Override + public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) { + creatingExecutionGraphStateValidator.validateInput(previousExecutionGraph); + hadStateTransition = true; + } + + @Override + public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) { + if (!hadStateTransition) { + action.run(); + } + return CompletedScheduledFuture.create(null); + } + + @Override + public void close() throws Exception { + super.close(); + cancellingStateValidator.close(); + waitingForResourcesStateValidator.close(); + creatingExecutionGraphStateValidator.close(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java index 346f09b00eb..1686d90b0db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java @@ -19,13 +19,10 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.core.testutils.CompletedScheduledFuture; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.failure.FailureEnricherUtils; import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; -import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; -import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -33,13 +30,8 @@ import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.time.Duration; import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ScheduledFuture; -import java.util.function.Consumer; import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; import static org.assertj.core.api.Assertions.assertThat; @@ -184,72 +176,4 @@ class RestartingTest { public Restarting createRestartingState(MockRestartingContext ctx) { return createRestartingState(ctx, new StateTrackingMockExecutionGraph()); } - - private static class MockRestartingContext extends MockStateWithExecutionGraphContext - implements Restarting.Context { - - private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator = - new StateValidator<>("Cancelling"); - - private final StateValidator<ExecutionGraph> waitingForResourcesStateValidator = - new StateValidator<>("WaitingForResources"); - - private final StateValidator<ExecutionGraph> creatingExecutionGraphStateValidator = - new StateValidator<>("CreatingExecutionGraph"); - - public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) { - cancellingStateValidator.expectInput(asserter); - } - - public void setExpectWaitingForResources() { - waitingForResourcesStateValidator.expectInput(assertNonNull()); - } - - public void setExpectCreatingExecutionGraph() { - creatingExecutionGraphStateValidator.expectInput(assertNonNull()); - } - - @Override - public void goToCanceling( - ExecutionGraph executionGraph, - ExecutionGraphHandler executionGraphHandler, - OperatorCoordinatorHandler operatorCoordinatorHandler, - List<ExceptionHistoryEntry> failureCollection) { - cancellingStateValidator.validateInput( - new ExecutingTest.CancellingArguments( - executionGraph, executionGraphHandler, operatorCoordinatorHandler)); - hadStateTransition = true; - } - - @Override - public void archiveFailure(RootExceptionHistoryEntry failure) {} - - @Override - public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { - waitingForResourcesStateValidator.validateInput(previousExecutionGraph); - hadStateTransition = true; - } - - @Override - public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) { - creatingExecutionGraphStateValidator.validateInput(previousExecutionGraph); - hadStateTransition = true; - } - - @Override - public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) { - if (!hadStateTransition) { - action.run(); - } - return CompletedScheduledFuture.create(null); - } - - @Override - public void close() throws Exception { - super.close(); - cancellingStateValidator.close(); - waitingForResourcesStateValidator.close(); - creatingExecutionGraphStateValidator.close(); - } - } }