This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 42e25939e4a [FLINK-36871][runtime] add rescale metrics in scheduler 
(#25770)
42e25939e4a is described below

commit 42e25939e4ae4e2aa70c122e268cc4cd5dd6eb41
Author: Peter Huang <huangzhenqiu0...@gmail.com>
AuthorDate: Thu Jan 9 02:23:08 2025 -0800

    [FLINK-36871][runtime] add rescale metrics in scheduler (#25770)
---
 docs/content.zh/docs/ops/metrics.md                |   7 +-
 docs/content/docs/ops/metrics.md                   |   7 +-
 .../apache/flink/runtime/metrics/MetricNames.java  |   1 +
 .../flink/runtime/scheduler/DefaultScheduler.java  |   6 ++
 .../flink/runtime/scheduler/SchedulerBase.java     |   5 +
 .../scheduler/adaptive/AdaptiveScheduler.java      |   7 ++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  76 ++++++++++++++-
 .../scheduler/adaptive/MockRestartingContext.java  | 104 +++++++++++++++++++++
 .../runtime/scheduler/adaptive/RestartingTest.java |  76 ---------------
 9 files changed, 209 insertions(+), 80 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md 
b/docs/content.zh/docs/ops/metrics.md
index cd9369d0ff3..df901d3bddb 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1271,7 +1271,12 @@ Whether these metrics are reported depends on the 
[metrics.job.status.enable]({{
     </tr>
     <tr>
       <td>numRestarts</td>
-      <td>The total number of restarts since this job was submitted, including 
full restarts and fine-grained restarts.</td>
+      <td>The total number of restarts since this job was submitted, including 
full restarts, fine-grained restarts and restarts triggered by rescaling.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>numRescales</td>
+      <td>The total number of restarts triggered by rescaling, including scale 
up and scale down.</td>
       <td>Gauge</td>
     </tr>
   </tbody>
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index c316bf15968..74f412832b9 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1262,7 +1262,12 @@ Whether these metrics are reported depends on the 
[metrics.job.status.enable]({{
     </tr>
     <tr>
       <td>numRestarts</td>
-      <td>The total number of restarts since this job was submitted, including 
full restarts and fine-grained restarts.</td>
+      <td>The total number of restarts since this job was submitted, including 
full restarts, fine-grained restarts and restarts triggered by rescaling.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>numRescales</td>
+      <td>The total number of restarts triggered by rescaling, including scale 
up and scale down.</td>
       <td>Gauge</td>
     </tr>
   </tbody>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index a0eca1fde2a..96e826846ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -52,6 +52,7 @@ public class MetricNames {
     public static final String NUM_PENDING_TASK_MANAGERS = 
"numPendingTaskManagers";
 
     public static final String NUM_RESTARTS = "numRestarts";
+    public static final String NUM_RESCALES = "numRescales";
 
     public static final String MEMORY_USED = "Used";
     public static final String MEMORY_COMMITTED = "Committed";
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 0bbddd3317d..38bdaed8c2b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -226,6 +226,12 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
         return executionFailureHandler.getNumberOfRestarts();
     }
 
+    @Override
+    protected long getNumberOfRescales() {
+        // It is always 0 for DefaultScheduler.
+        return 0;
+    }
+
     @Override
     protected void cancelAllPendingSlotRequestsInternal() {
         getSchedulingTopology()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index dc2db52b405..80aab984bfd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -604,6 +604,8 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
 
     protected abstract long getNumberOfRestarts();
 
+    protected abstract long getNumberOfRescales();
+
     protected MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy() 
{
         // blocking partition always need mark finished.
         return 
ResultPartitionType::isBlockingOrBlockingPersistentResultPartition;
@@ -650,6 +652,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
                 jobManagerJobMetricGroup,
                 executionGraph,
                 this::getNumberOfRestarts,
+                this::getNumberOfRescales,
                 deploymentStateTimeMetrics,
                 executionGraph::registerJobStatusListener,
                 executionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
@@ -662,6 +665,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
             MetricGroup metrics,
             JobStatusProvider jobStatusProvider,
             Gauge<Long> numberOfRestarts,
+            Gauge<Long> numberOfRescales,
             DeploymentStateTimeMetrics deploymentTimeMetrics,
             Consumer<JobStatusListener> jobStatusListenerRegistrar,
             long initializationTimestamp,
@@ -669,6 +673,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
         metrics.gauge(DownTimeGauge.METRIC_NAME, new 
DownTimeGauge(jobStatusProvider));
         metrics.gauge(UpTimeGauge.METRIC_NAME, new 
UpTimeGauge(jobStatusProvider));
         metrics.gauge(MetricNames.NUM_RESTARTS, numberOfRestarts::getValue);
+        metrics.gauge(MetricNames.NUM_RESCALES, numberOfRescales::getValue);
 
         final JobStatusMetrics jobStatusMetrics =
                 new JobStatusMetrics(initializationTimestamp, 
jobStatusMetricsSettings);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 4e99580545b..31505241fc1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -404,6 +404,8 @@ public class AdaptiveScheduler
 
     private int numRestarts = 0;
 
+    private int numRescales = 0;
+
     private final MutableVertexAttemptNumberStore vertexAttemptNumberStore =
             new DefaultVertexAttemptNumberStore();
 
@@ -557,6 +559,7 @@ public class AdaptiveScheduler
                 jobManagerJobMetricGroup,
                 jobStatusStore,
                 () -> (long) numRestarts,
+                () -> (long) numRescales,
                 deploymentTimeMetrics,
                 tmpJobStatusListeners::add,
                 initializationTimestamp,
@@ -1267,7 +1270,11 @@ public class AdaptiveScheduler
                         forcedRestart,
                         userCodeClassLoader,
                         failureCollection));
+
         numRestarts++;
+        if (failureCollection.isEmpty()) {
+            numRescales++;
+        }
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 3a223cb4b8c..c3a77d26fcd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -92,6 +92,8 @@ import 
org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerTest;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
@@ -592,13 +594,16 @@ public class AdaptiveSchedulerTest {
     }
 
     @Test
-    void testNumRestartsMetric() throws Exception {
+    void testNumRescalesAndStartsMetricForRescale() throws Exception {
+        final CompletableFuture<Gauge<Long>> numRescalesMetricFuture = new 
CompletableFuture<>();
         final CompletableFuture<Gauge<Long>> numRestartsMetricFuture = new 
CompletableFuture<>();
         final MetricRegistry metricRegistry =
                 TestingMetricRegistry.builder()
                         .setRegisterConsumer(
                                 (metric, name, group) -> {
-                                    if (MetricNames.NUM_RESTARTS.equals(name)) 
{
+                                    if (MetricNames.NUM_RESCALES.equals(name)) 
{
+                                        
numRescalesMetricFuture.complete((Gauge<Long>) metric);
+                                    } else if 
(MetricNames.NUM_RESTARTS.equals(name)) {
                                         
numRestartsMetricFuture.complete((Gauge<Long>) metric);
                                     }
                                 })
@@ -629,6 +634,7 @@ public class AdaptiveSchedulerTest {
                         .setDeclarativeSlotPool(declarativeSlotPool)
                         .build();
 
+        final Gauge<Long> numRescalesMetric = numRescalesMetricFuture.get();
         final Gauge<Long> numRestartsMetric = numRestartsMetricFuture.get();
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
@@ -649,6 +655,7 @@ public class AdaptiveSchedulerTest {
         // wait for the first task submission
         taskManagerGateway.waitForSubmissions(1);
 
+        assertThat(numRescalesMetric.getValue()).isEqualTo(0L);
         assertThat(numRestartsMetric.getValue()).isEqualTo(0L);
 
         // offer more slots, which will cause a restart in order to scale up
@@ -664,9 +671,74 @@ public class AdaptiveSchedulerTest {
         // wait for the second task submissions
         taskManagerGateway.waitForSubmissions(PARALLELISM);
 
+        assertThat(numRescalesMetric.getValue()).isEqualTo(1L);
         assertThat(numRestartsMetric.getValue()).isEqualTo(1L);
     }
 
+    @Test
+    void testNumRescalesAndStartsMetricForFailureRecovery() throws Exception {
+        final CompletableFuture<Gauge<Long>> numRescalesMetricFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Gauge<Long>> numRestartsMetricFuture = new 
CompletableFuture<>();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    if (MetricNames.NUM_RESCALES.equals(name)) 
{
+                                        
numRescalesMetricFuture.complete((Gauge<Long>) metric);
+                                    } else if 
(MetricNames.NUM_RESTARTS.equals(name)) {
+                                        
numRestartsMetricFuture.complete((Gauge<Long>) metric);
+                                    }
+                                })
+                        .build();
+
+        scheduler =
+                new AdaptiveSchedulerBuilder(
+                                createJobGraph(),
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
+                        
.setJobMasterConfiguration(createConfigurationWithNoTimeouts())
+                        .setJobManagerJobMetricGroup(
+                                
JobManagerMetricGroup.createJobManagerMetricGroup(
+                                                metricRegistry, "localhost")
+                                        .addJob(new JobID(), "jobName"))
+                        .build();
+
+        try (MockRestartingContext ctx = new MockRestartingContext()) {
+            StateTrackingMockExecutionGraph executionGraph = new 
StateTrackingMockExecutionGraph();
+
+            final ExecutionGraphHandler executionGraphHandler =
+                    new ExecutionGraphHandler(
+                            executionGraph,
+                            LOG,
+                            ctx.getMainThreadExecutor(),
+                            ctx.getMainThreadExecutor());
+            final OperatorCoordinatorHandler operatorCoordinatorHandler =
+                    new TestingOperatorCoordinatorHandler();
+            executionGraph.transitionToRunning();
+
+            List<ExceptionHistoryEntry> failureCollection =
+                    List.of(
+                            ExceptionHistoryEntry.createGlobal(
+                                    new Exception("test"),
+                                    
CompletableFuture.completedFuture(Collections.emptyMap())));
+            runInMainThread(
+                    () ->
+                            scheduler.goToRestarting(
+                                    executionGraph,
+                                    executionGraphHandler,
+                                    operatorCoordinatorHandler,
+                                    Duration.ZERO,
+                                    true,
+                                    failureCollection));
+        }
+
+        final Gauge<Long> numRestartsMetric = numRestartsMetricFuture.get();
+        assertThat(numRestartsMetric.getValue()).isEqualTo(1L);
+
+        final Gauge<Long> numScalesMetric = numRescalesMetricFuture.get();
+        assertThat(numScalesMetric.getValue()).isEqualTo(0L);
+    }
+
     @Test
     void testStatusMetrics() throws Exception {
         final CompletableFuture<UpTimeGauge> upTimeMetricFuture = new 
CompletableFuture<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
new file mode 100644
index 00000000000..a507c9d0b63
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.core.testutils.CompletedScheduledFuture;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
+import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
+
+/** Mock the {@link StateWithExecutionGraph.Context} for restarting state. */
+class MockRestartingContext extends MockStateWithExecutionGraphContext
+        implements Restarting.Context {
+
+    private final StateValidator<ExecutingTest.CancellingArguments> 
cancellingStateValidator =
+            new StateValidator<>("Cancelling");
+
+    private final StateValidator<ExecutionGraph> 
waitingForResourcesStateValidator =
+            new StateValidator<>("WaitingForResources");
+
+    private final StateValidator<ExecutionGraph> 
creatingExecutionGraphStateValidator =
+            new StateValidator<>("CreatingExecutionGraph");
+
+    public void 
setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
+        cancellingStateValidator.expectInput(asserter);
+    }
+
+    public void setExpectWaitingForResources() {
+        waitingForResourcesStateValidator.expectInput(assertNonNull());
+    }
+
+    public void setExpectCreatingExecutionGraph() {
+        creatingExecutionGraphStateValidator.expectInput(assertNonNull());
+    }
+
+    @Override
+    public void goToCanceling(
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            List<ExceptionHistoryEntry> failureCollection) {
+        cancellingStateValidator.validateInput(
+                new ExecutingTest.CancellingArguments(
+                        executionGraph, executionGraphHandler, 
operatorCoordinatorHandler));
+        hadStateTransition = true;
+    }
+
+    @Override
+    public void archiveFailure(RootExceptionHistoryEntry failure) {}
+
+    @Override
+    public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
+        
waitingForResourcesStateValidator.validateInput(previousExecutionGraph);
+        hadStateTransition = true;
+    }
+
+    @Override
+    public void goToCreatingExecutionGraph(@Nullable ExecutionGraph 
previousExecutionGraph) {
+        
creatingExecutionGraphStateValidator.validateInput(previousExecutionGraph);
+        hadStateTransition = true;
+    }
+
+    @Override
+    public ScheduledFuture<?> runIfState(State expectedState, Runnable action, 
Duration delay) {
+        if (!hadStateTransition) {
+            action.run();
+        }
+        return CompletedScheduledFuture.create(null);
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        cancellingStateValidator.close();
+        waitingForResourcesStateValidator.close();
+        creatingExecutionGraphStateValidator.close();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
index 346f09b00eb..1686d90b0db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
@@ -19,13 +19,10 @@
 package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.core.testutils.CompletedScheduledFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
-import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
-import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -33,13 +30,8 @@ import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -184,72 +176,4 @@ class RestartingTest {
     public Restarting createRestartingState(MockRestartingContext ctx) {
         return createRestartingState(ctx, new 
StateTrackingMockExecutionGraph());
     }
-
-    private static class MockRestartingContext extends 
MockStateWithExecutionGraphContext
-            implements Restarting.Context {
-
-        private final StateValidator<ExecutingTest.CancellingArguments> 
cancellingStateValidator =
-                new StateValidator<>("Cancelling");
-
-        private final StateValidator<ExecutionGraph> 
waitingForResourcesStateValidator =
-                new StateValidator<>("WaitingForResources");
-
-        private final StateValidator<ExecutionGraph> 
creatingExecutionGraphStateValidator =
-                new StateValidator<>("CreatingExecutionGraph");
-
-        public void 
setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
-            cancellingStateValidator.expectInput(asserter);
-        }
-
-        public void setExpectWaitingForResources() {
-            waitingForResourcesStateValidator.expectInput(assertNonNull());
-        }
-
-        public void setExpectCreatingExecutionGraph() {
-            creatingExecutionGraphStateValidator.expectInput(assertNonNull());
-        }
-
-        @Override
-        public void goToCanceling(
-                ExecutionGraph executionGraph,
-                ExecutionGraphHandler executionGraphHandler,
-                OperatorCoordinatorHandler operatorCoordinatorHandler,
-                List<ExceptionHistoryEntry> failureCollection) {
-            cancellingStateValidator.validateInput(
-                    new ExecutingTest.CancellingArguments(
-                            executionGraph, executionGraphHandler, 
operatorCoordinatorHandler));
-            hadStateTransition = true;
-        }
-
-        @Override
-        public void archiveFailure(RootExceptionHistoryEntry failure) {}
-
-        @Override
-        public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
-            
waitingForResourcesStateValidator.validateInput(previousExecutionGraph);
-            hadStateTransition = true;
-        }
-
-        @Override
-        public void goToCreatingExecutionGraph(@Nullable ExecutionGraph 
previousExecutionGraph) {
-            
creatingExecutionGraphStateValidator.validateInput(previousExecutionGraph);
-            hadStateTransition = true;
-        }
-
-        @Override
-        public ScheduledFuture<?> runIfState(State expectedState, Runnable 
action, Duration delay) {
-            if (!hadStateTransition) {
-                action.run();
-            }
-            return CompletedScheduledFuture.create(null);
-        }
-
-        @Override
-        public void close() throws Exception {
-            super.close();
-            cancellingStateValidator.close();
-            waitingForResourcesStateValidator.close();
-            creatingExecutionGraphStateValidator.close();
-        }
-    }
 }

Reply via email to