This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4e3b6646f389eeb395a4dbb951d13bab02cb8db
Author: David Moravek <[email protected]>
AuthorDate: Mon Feb 27 19:08:37 2023 +0100

    [FLINK-31470] Add integration tests for Externalized Declarative Resource 
Management.
    
    Signed-off-by: David Moravek <[email protected]>
---
 .../client/program/rest/RestClusterClient.java     |  36 ++++
 .../flink/runtime/minicluster/MiniCluster.java     |  28 +--
 ...pdateJobResourceRequirementsRecoveryITCase.java | 115 ++++++++++
 .../UpdateJobResourceRequirementsITCase.java       | 235 +++++++++++++++++++++
 4 files changed, 402 insertions(+), 12 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a05350d5e58..c81619a97bf 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.highavailability.ClientHighAvailabilityServicesF
 import 
org.apache.flink.runtime.highavailability.DefaultClientHighAvailabilityServicesFactory;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -50,6 +51,8 @@ import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -79,6 +82,8 @@ import 
org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import 
org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
@@ -802,6 +807,37 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
         return resultFuture;
     }
 
+    /**
+     * Update {@link JobResourceRequirements} of a given job.
+     *
+     * @param jobId jobId specifies the job for which to change the resource 
requirements
+     * @param jobResourceRequirements new resource requirements for the 
provided job
+     * @return Future which is completed upon successful operation.
+     */
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        final JobMessageParameters params = new JobMessageParameters();
+        params.jobPathParameter.resolve(jobId);
+
+        return sendRequest(
+                        JobResourcesRequirementsUpdateHeaders.INSTANCE,
+                        params,
+                        new 
JobResourceRequirementsBody(jobResourceRequirements))
+                .thenApply(ignored -> Acknowledge.get());
+    }
+
+    /**
+     * Get an overview of the Flink cluster.
+     *
+     * @return Future with the {@link ClusterOverviewWithVersion cluster 
overview}.
+     */
+    public CompletableFuture<ClusterOverviewWithVersion> getClusterOverview() {
+        return sendRequest(
+                ClusterOverviewHeaders.getInstance(),
+                EmptyMessageParameters.getInstance(),
+                EmptyRequestBody.getInstance());
+    }
+
     // ======================================
     // Legacy stuff we actually implement
     // ======================================
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1cb0a52213f..7f39ab1ae0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -648,6 +648,14 @@ public class MiniCluster implements AutoCloseableAsync {
      */
     @Override
     public CompletableFuture<Void> closeAsync() {
+        return closeInternal(true);
+    }
+
+    public CompletableFuture<Void> 
closeAsyncWithoutCleaningHighAvailabilityData() {
+        return closeInternal(false);
+    }
+
+    private CompletableFuture<Void> closeInternal(boolean cleanupHaData) {
         synchronized (lock) {
             if (running) {
                 LOG.info("Shutting down Flink Mini Cluster");
@@ -678,7 +686,7 @@ public class MiniCluster implements AutoCloseableAsync {
                     final CompletableFuture<Void> 
remainingServicesTerminationFuture =
                             FutureUtils.runAfterwards(
                                     rpcServicesTerminationFuture,
-                                    this::terminateMiniClusterServices);
+                                    () -> 
terminateMiniClusterServices(cleanupHaData));
 
                     final CompletableFuture<Void> executorsTerminationFuture =
                             FutureUtils.composeAfterwards(
@@ -1008,7 +1016,7 @@ public class MiniCluster implements AutoCloseableAsync {
         // When MiniCluster uses the local RPC, the provided JobGraph is 
passed directly to the
         // Dispatcher. This means that any mutations to the JG can affect the 
Dispatcher behaviour,
         // so we rather clone it to guard against this.
-        final JobGraph clonedJobGraph = cloneJobGraph(jobGraph);
+        final JobGraph clonedJobGraph = 
InstantiationUtil.cloneUnchecked(jobGraph);
         checkRestoreModeForChangelogStateBackend(clonedJobGraph);
         final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture =
                 getDispatcherGatewayFuture();
@@ -1240,7 +1248,7 @@ public class MiniCluster implements AutoCloseableAsync {
                 });
     }
 
-    private void terminateMiniClusterServices() throws Exception {
+    private void terminateMiniClusterServices(boolean cleanupHaData) throws 
Exception {
         // collect the first exception, but continue and add all successive
         // exceptions as suppressed
         Exception exception = null;
@@ -1268,7 +1276,11 @@ public class MiniCluster implements AutoCloseableAsync {
             // shut down high-availability services
             if (haServices != null) {
                 try {
-                    haServices.closeAndCleanupAllData();
+                    if (cleanupHaData) {
+                        haServices.closeAndCleanupAllData();
+                    } else {
+                        haServices.close();
+                    }
                 } catch (Exception e) {
                     exception = ExceptionUtils.firstOrSuppressed(e, exception);
                 }
@@ -1329,14 +1341,6 @@ public class MiniCluster implements AutoCloseableAsync {
         }
     }
 
-    private static JobGraph cloneJobGraph(JobGraph jobGraph) {
-        try {
-            return InstantiationUtil.clone(jobGraph);
-        } catch (ClassNotFoundException | IOException e) {
-            throw new IllegalStateException("Unable to clone the provided 
JobGraph.", e);
-        }
-    }
-
     public CompletableFuture<Void> invalidateClusterDataset(AbstractID 
clusterDatasetId) {
         return resourceManagerGatewayRetriever
                 .getFuture()
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java
new file mode 100644
index 00000000000..d3d673a834d
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+
+/** Tests for recovering of rescaled jobs. */
+@ExtendWith(TestLoggerExtension.class)
+class UpdateJobResourceRequirementsRecoveryITCase {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(UpdateJobResourceRequirementsRecoveryITCase.class);
+
+    @RegisterExtension
+    private static final AllCallbackWrapper<ZooKeeperExtension> 
ZOOKEEPER_EXTENSION =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    /** Tests that a rescaled job graph will be recovered with the latest 
parallelism. */
+    @Test
+    void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path 
tmpFolder) throws Exception {
+        final JobVertex jobVertex = new JobVertex("operator");
+        jobVertex.setParallelism(1);
+        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
+        final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(jobVertex);
+        final JobID jobId = jobGraph.getJobID();
+
+        final Configuration configuration = new Configuration();
+
+        configuration.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        configuration.set(HighAvailabilityOptions.HA_MODE, "zookeeper");
+        configuration.set(
+                HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+                ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
+        configuration.set(
+                HighAvailabilityOptions.HA_STORAGE_PATH, 
tmpFolder.toFile().getAbsolutePath());
+
+        final MiniClusterConfiguration miniClusterConfiguration =
+                new MiniClusterConfiguration.Builder()
+                        .setConfiguration(configuration)
+                        .setNumSlotsPerTaskManager(2)
+                        .build();
+
+        final RestClusterClient<?> restClusterClient =
+                new RestClusterClient<>(configuration, "foobar");
+
+        final MiniCluster miniCluster = new 
MiniCluster(miniClusterConfiguration);
+        miniCluster.start();
+
+        
assertThatFuture(restClusterClient.submitJob(jobGraph)).eventuallySucceeds();
+
+        ClientUtils.waitUntilJobInitializationFinished(
+                () -> restClusterClient.getJobStatus(jobId).get(),
+                () -> restClusterClient.requestJobResult(jobId).get(),
+                getClass().getClassLoader());
+
+        assertThatFuture(
+                        restClusterClient.updateJobResourceRequirements(
+                                jobGraph.getJobID(),
+                                JobResourceRequirements.newBuilder()
+                                        
.setParallelismForJobVertex(jobVertex.getID(), 1, 2)
+                                        .build()))
+                .eventuallySucceeds();
+        miniCluster.closeAsyncWithoutCleaningHighAvailabilityData();
+
+        LOG.info("Start second mini cluster to recover the persisted job.");
+
+        try (final MiniCluster recoveredMiniCluster = new 
MiniCluster(miniClusterConfiguration)) {
+            recoveredMiniCluster.start();
+            
UpdateJobResourceRequirementsITCase.waitForRunningTasks(restClusterClient, 
jobId, 2);
+        }
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/UpdateJobResourceRequirementsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/UpdateJobResourceRequirementsITCase.java
new file mode 100644
index 00000000000..74829f34caf
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/UpdateJobResourceRequirementsITCase.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/** Tests for the manual rescaling of Flink jobs using the REST API. */
+@ExtendWith(TestLoggerExtension.class)
+public class UpdateJobResourceRequirementsITCase {
+
+    private static final int NUMBER_OF_SLOTS = 4;
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(createConfiguration())
+                            .setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS)
+                            .build());
+
+    private static Configuration createConfiguration() {
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+
+        // speed the test suite up
+        // - lower refresh interval -> controls how fast we invalidate 
ExecutionGraphCache
+        // - lower slot idle timeout -> controls how fast we return idle slots 
to TM
+        configuration.set(WebOptions.REFRESH_INTERVAL, 50L);
+        configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, 50L);
+
+        return configuration;
+    }
+
+    private RestClusterClient<?> restClusterClient;
+
+    @BeforeEach
+    void beforeEach(@InjectClusterClient RestClusterClient<?> 
restClusterClient) {
+        this.restClusterClient = restClusterClient;
+    }
+
+    @Test
+    void testManualUpScalingWithNewSlotAllocation() throws Exception {
+        final JobVertex jobVertex = new JobVertex("Single operator");
+        final int initialParallelism = 1;
+        final int parallelismAfterRescaling = 2;
+        jobVertex.setParallelism(initialParallelism);
+        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
+        final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(jobVertex);
+
+        runRescalingTest(
+                jobGraph,
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(jobVertex.getID(), 1, 
parallelismAfterRescaling)
+                        .build(),
+                initialParallelism,
+                parallelismAfterRescaling,
+                NUMBER_OF_SLOTS - parallelismAfterRescaling);
+    }
+
+    @Test
+    void testManualUpScalingWithNoNewSlotAllocation() throws Exception {
+        final int initialRunningTasks = 3;
+        final int runningTasksAfterRescale = 4;
+
+        final JobVertex jobVertex1 = new JobVertex("Operator1");
+        jobVertex1.setParallelism(1);
+        jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
+        final JobVertex jobVertex2 = new JobVertex("Operator2");
+        jobVertex2.setParallelism(2);
+        jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+        final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+        jobVertex1.setSlotSharingGroup(slotSharingGroup);
+        jobVertex2.setSlotSharingGroup(slotSharingGroup);
+
+        final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(jobVertex1, jobVertex2);
+
+        runRescalingTest(
+                jobGraph,
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(jobVertex1.getID(), 1, 2)
+                        .setParallelismForJobVertex(jobVertex2.getID(), 1, 2)
+                        .build(),
+                initialRunningTasks,
+                runningTasksAfterRescale,
+                NUMBER_OF_SLOTS - 2);
+    }
+
+    @Test
+    void testManualUpScalingWithDifferentSlotSharingGroups() throws Exception {
+        final int initialParallelism = 1;
+        final int desiredParallelism = 2;
+
+        // the job is going to have two slot sharing groups
+        final int initialRunningTasks = 2 * initialParallelism;
+        final int runningTasksAfterRescale = 2 * desiredParallelism;
+
+        final JobVertex jobVertex1 = new JobVertex("Operator1");
+        jobVertex1.setParallelism(initialParallelism);
+        jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
+        final JobVertex jobVertex2 = new JobVertex("Operator2");
+        jobVertex2.setParallelism(initialParallelism);
+        jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+        jobVertex1.setSlotSharingGroup(new SlotSharingGroup());
+        jobVertex2.setSlotSharingGroup(new SlotSharingGroup());
+
+        final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(jobVertex1, jobVertex2);
+
+        runRescalingTest(
+                jobGraph,
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(
+                                jobVertex1.getID(), initialParallelism, 
desiredParallelism)
+                        .setParallelismForJobVertex(
+                                jobVertex2.getID(), initialParallelism, 
desiredParallelism)
+                        .build(),
+                initialRunningTasks,
+                runningTasksAfterRescale,
+                NUMBER_OF_SLOTS - jobGraph.getSlotSharingGroups().size() * 
desiredParallelism);
+    }
+
+    @Test
+    void testManualDownScaling() throws Exception {
+        final int initialRunningTasks = 2;
+        final int runningTasksAfterRescale = 1;
+
+        final JobVertex jobVertex = new JobVertex("Operator");
+        jobVertex.setParallelism(initialRunningTasks);
+        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
+
+        final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(jobVertex);
+
+        runRescalingTest(
+                jobGraph,
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(jobVertex.getID(), 1, 
runningTasksAfterRescale)
+                        .build(),
+                initialRunningTasks,
+                runningTasksAfterRescale,
+                NUMBER_OF_SLOTS - runningTasksAfterRescale);
+    }
+
+    private void runRescalingTest(
+            JobGraph jobGraph,
+            JobResourceRequirements newJobVertexParallelism,
+            int initialRunningTasks,
+            int runningTasksAfterRescale,
+            int freeSlotsAfterRescale)
+            throws Exception {
+        restClusterClient.submitJob(jobGraph).join();
+        try {
+            final JobID jobId = jobGraph.getJobID();
+
+            waitForRunningTasks(restClusterClient, jobId, initialRunningTasks);
+
+            restClusterClient.updateJobResourceRequirements(jobId, 
newJobVertexParallelism).join();
+
+            waitForRunningTasks(restClusterClient, jobId, 
runningTasksAfterRescale);
+            waitForAvailableSlots(restClusterClient, freeSlotsAfterRescale);
+        } finally {
+            restClusterClient.cancel(jobGraph.getJobID()).join();
+        }
+    }
+
+    private static int getNumberRunningTasks(RestClusterClient<?> 
restClusterClient, JobID jobId) {
+        final JobDetailsInfo jobDetailsInfo = 
restClusterClient.getJobDetails(jobId).join();
+        return jobDetailsInfo.getJobVertexInfos().stream()
+                .map(JobDetailsInfo.JobVertexDetailsInfo::getTasksPerState)
+                .map(tasksPerState -> 
tasksPerState.get(ExecutionState.RUNNING))
+                .mapToInt(Integer::intValue)
+                .sum();
+    }
+
+    public static void waitForRunningTasks(
+            RestClusterClient<?> restClusterClient, JobID jobId, int 
desiredNumberOfRunningTasks)
+            throws Exception {
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    final int numberOfRunningTasks =
+                            getNumberRunningTasks(restClusterClient, jobId);
+                    return numberOfRunningTasks == desiredNumberOfRunningTasks;
+                });
+    }
+
+    public static void waitForAvailableSlots(
+            RestClusterClient<?> restClusterClient, int 
desiredNumberOfAvailableSlots)
+            throws Exception {
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    final ClusterOverviewWithVersion clusterOverview =
+                            restClusterClient.getClusterOverview().join();
+                    return clusterOverview.getNumSlotsAvailable() == 
desiredNumberOfAvailableSlots;
+                });
+    }
+}

Reply via email to