This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d4e3b6646f389eeb395a4dbb951d13bab02cb8db Author: David Moravek <[email protected]> AuthorDate: Mon Feb 27 19:08:37 2023 +0100 [FLINK-31470] Add integration tests for Externalized Declarative Resource Management. Signed-off-by: David Moravek <[email protected]> --- .../client/program/rest/RestClusterClient.java | 36 ++++ .../flink/runtime/minicluster/MiniCluster.java | 28 +-- ...pdateJobResourceRequirementsRecoveryITCase.java | 115 ++++++++++ .../UpdateJobResourceRequirementsITCase.java | 235 +++++++++++++++++++++ 4 files changed, 402 insertions(+), 12 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index a05350d5e58..c81619a97bf 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServicesF import org.apache.flink.runtime.highavailability.DefaultClientHighAvailabilityServicesFactory; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -50,6 +51,8 @@ import org.apache.flink.runtime.rest.FileUpload; import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; import org.apache.flink.runtime.rest.handler.async.TriggerResponse; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; @@ -79,6 +82,8 @@ import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; +import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody; +import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders; import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; @@ -802,6 +807,37 @@ public class RestClusterClient<T> implements ClusterClient<T> { return resultFuture; } + /** + * Update {@link JobResourceRequirements} of a given job. + * + * @param jobId jobId specifies the job for which to change the resource requirements + * @param jobResourceRequirements new resource requirements for the provided job + * @return Future which is completed upon successful operation. + */ + public CompletableFuture<Acknowledge> updateJobResourceRequirements( + JobID jobId, JobResourceRequirements jobResourceRequirements) { + final JobMessageParameters params = new JobMessageParameters(); + params.jobPathParameter.resolve(jobId); + + return sendRequest( + JobResourcesRequirementsUpdateHeaders.INSTANCE, + params, + new JobResourceRequirementsBody(jobResourceRequirements)) + .thenApply(ignored -> Acknowledge.get()); + } + + /** + * Get an overview of the Flink cluster. + * + * @return Future with the {@link ClusterOverviewWithVersion cluster overview}. + */ + public CompletableFuture<ClusterOverviewWithVersion> getClusterOverview() { + return sendRequest( + ClusterOverviewHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()); + } + // ====================================== // Legacy stuff we actually implement // ====================================== diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 1cb0a52213f..7f39ab1ae0c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -648,6 +648,14 @@ public class MiniCluster implements AutoCloseableAsync { */ @Override public CompletableFuture<Void> closeAsync() { + return closeInternal(true); + } + + public CompletableFuture<Void> closeAsyncWithoutCleaningHighAvailabilityData() { + return closeInternal(false); + } + + private CompletableFuture<Void> closeInternal(boolean cleanupHaData) { synchronized (lock) { if (running) { LOG.info("Shutting down Flink Mini Cluster"); @@ -678,7 +686,7 @@ public class MiniCluster implements AutoCloseableAsync { final CompletableFuture<Void> remainingServicesTerminationFuture = FutureUtils.runAfterwards( rpcServicesTerminationFuture, - this::terminateMiniClusterServices); + () -> terminateMiniClusterServices(cleanupHaData)); final CompletableFuture<Void> executorsTerminationFuture = FutureUtils.composeAfterwards( @@ -1008,7 +1016,7 @@ public class MiniCluster implements AutoCloseableAsync { // When MiniCluster uses the local RPC, the provided JobGraph is passed directly to the // Dispatcher. This means that any mutations to the JG can affect the Dispatcher behaviour, // so we rather clone it to guard against this. - final JobGraph clonedJobGraph = cloneJobGraph(jobGraph); + final JobGraph clonedJobGraph = InstantiationUtil.cloneUnchecked(jobGraph); checkRestoreModeForChangelogStateBackend(clonedJobGraph); final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture(); @@ -1240,7 +1248,7 @@ public class MiniCluster implements AutoCloseableAsync { }); } - private void terminateMiniClusterServices() throws Exception { + private void terminateMiniClusterServices(boolean cleanupHaData) throws Exception { // collect the first exception, but continue and add all successive // exceptions as suppressed Exception exception = null; @@ -1268,7 +1276,11 @@ public class MiniCluster implements AutoCloseableAsync { // shut down high-availability services if (haServices != null) { try { - haServices.closeAndCleanupAllData(); + if (cleanupHaData) { + haServices.closeAndCleanupAllData(); + } else { + haServices.close(); + } } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } @@ -1329,14 +1341,6 @@ public class MiniCluster implements AutoCloseableAsync { } } - private static JobGraph cloneJobGraph(JobGraph jobGraph) { - try { - return InstantiationUtil.clone(jobGraph); - } catch (ClassNotFoundException | IOException e) { - throw new IllegalStateException("Unable to clone the provided JobGraph.", e); - } - } - public CompletableFuture<Void> invalidateClusterDataset(AbstractID clusterDatasetId) { return resourceManagerGatewayRetriever .getFuture() diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java new file mode 100644 index 00000000000..d3d673a834d --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; + +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; + +/** Tests for recovering of rescaled jobs. */ +@ExtendWith(TestLoggerExtension.class) +class UpdateJobResourceRequirementsRecoveryITCase { + + private static final Logger LOG = + LoggerFactory.getLogger(UpdateJobResourceRequirementsRecoveryITCase.class); + + @RegisterExtension + private static final AllCallbackWrapper<ZooKeeperExtension> ZOOKEEPER_EXTENSION = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + /** Tests that a rescaled job graph will be recovered with the latest parallelism. */ + @Test + void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path tmpFolder) throws Exception { + final JobVertex jobVertex = new JobVertex("operator"); + jobVertex.setParallelism(1); + jobVertex.setInvokableClass(BlockingNoOpInvokable.class); + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex); + final JobID jobId = jobGraph.getJobID(); + + final Configuration configuration = new Configuration(); + + configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); + configuration.set(HighAvailabilityOptions.HA_MODE, "zookeeper"); + configuration.set( + HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, + ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString()); + configuration.set( + HighAvailabilityOptions.HA_STORAGE_PATH, tmpFolder.toFile().getAbsolutePath()); + + final MiniClusterConfiguration miniClusterConfiguration = + new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumSlotsPerTaskManager(2) + .build(); + + final RestClusterClient<?> restClusterClient = + new RestClusterClient<>(configuration, "foobar"); + + final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration); + miniCluster.start(); + + assertThatFuture(restClusterClient.submitJob(jobGraph)).eventuallySucceeds(); + + ClientUtils.waitUntilJobInitializationFinished( + () -> restClusterClient.getJobStatus(jobId).get(), + () -> restClusterClient.requestJobResult(jobId).get(), + getClass().getClassLoader()); + + assertThatFuture( + restClusterClient.updateJobResourceRequirements( + jobGraph.getJobID(), + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(jobVertex.getID(), 1, 2) + .build())) + .eventuallySucceeds(); + miniCluster.closeAsyncWithoutCleaningHighAvailabilityData(); + + LOG.info("Start second mini cluster to recover the persisted job."); + + try (final MiniCluster recoveredMiniCluster = new MiniCluster(miniClusterConfiguration)) { + recoveredMiniCluster.start(); + UpdateJobResourceRequirementsITCase.waitForRunningTasks(restClusterClient, jobId, 2); + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/UpdateJobResourceRequirementsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/UpdateJobResourceRequirementsITCase.java new file mode 100644 index 00000000000..74829f34caf --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/UpdateJobResourceRequirementsITCase.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.scheduling; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Tests for the manual rescaling of Flink jobs using the REST API. */ +@ExtendWith(TestLoggerExtension.class) +public class UpdateJobResourceRequirementsITCase { + + private static final int NUMBER_OF_SLOTS = 4; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(createConfiguration()) + .setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS) + .build()); + + private static Configuration createConfiguration() { + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); + + // speed the test suite up + // - lower refresh interval -> controls how fast we invalidate ExecutionGraphCache + // - lower slot idle timeout -> controls how fast we return idle slots to TM + configuration.set(WebOptions.REFRESH_INTERVAL, 50L); + configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, 50L); + + return configuration; + } + + private RestClusterClient<?> restClusterClient; + + @BeforeEach + void beforeEach(@InjectClusterClient RestClusterClient<?> restClusterClient) { + this.restClusterClient = restClusterClient; + } + + @Test + void testManualUpScalingWithNewSlotAllocation() throws Exception { + final JobVertex jobVertex = new JobVertex("Single operator"); + final int initialParallelism = 1; + final int parallelismAfterRescaling = 2; + jobVertex.setParallelism(initialParallelism); + jobVertex.setInvokableClass(BlockingNoOpInvokable.class); + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex); + + runRescalingTest( + jobGraph, + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(jobVertex.getID(), 1, parallelismAfterRescaling) + .build(), + initialParallelism, + parallelismAfterRescaling, + NUMBER_OF_SLOTS - parallelismAfterRescaling); + } + + @Test + void testManualUpScalingWithNoNewSlotAllocation() throws Exception { + final int initialRunningTasks = 3; + final int runningTasksAfterRescale = 4; + + final JobVertex jobVertex1 = new JobVertex("Operator1"); + jobVertex1.setParallelism(1); + jobVertex1.setInvokableClass(BlockingNoOpInvokable.class); + final JobVertex jobVertex2 = new JobVertex("Operator2"); + jobVertex2.setParallelism(2); + jobVertex2.setInvokableClass(BlockingNoOpInvokable.class); + + final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); + jobVertex1.setSlotSharingGroup(slotSharingGroup); + jobVertex2.setSlotSharingGroup(slotSharingGroup); + + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex1, jobVertex2); + + runRescalingTest( + jobGraph, + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(jobVertex1.getID(), 1, 2) + .setParallelismForJobVertex(jobVertex2.getID(), 1, 2) + .build(), + initialRunningTasks, + runningTasksAfterRescale, + NUMBER_OF_SLOTS - 2); + } + + @Test + void testManualUpScalingWithDifferentSlotSharingGroups() throws Exception { + final int initialParallelism = 1; + final int desiredParallelism = 2; + + // the job is going to have two slot sharing groups + final int initialRunningTasks = 2 * initialParallelism; + final int runningTasksAfterRescale = 2 * desiredParallelism; + + final JobVertex jobVertex1 = new JobVertex("Operator1"); + jobVertex1.setParallelism(initialParallelism); + jobVertex1.setInvokableClass(BlockingNoOpInvokable.class); + final JobVertex jobVertex2 = new JobVertex("Operator2"); + jobVertex2.setParallelism(initialParallelism); + jobVertex2.setInvokableClass(BlockingNoOpInvokable.class); + + jobVertex1.setSlotSharingGroup(new SlotSharingGroup()); + jobVertex2.setSlotSharingGroup(new SlotSharingGroup()); + + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex1, jobVertex2); + + runRescalingTest( + jobGraph, + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex( + jobVertex1.getID(), initialParallelism, desiredParallelism) + .setParallelismForJobVertex( + jobVertex2.getID(), initialParallelism, desiredParallelism) + .build(), + initialRunningTasks, + runningTasksAfterRescale, + NUMBER_OF_SLOTS - jobGraph.getSlotSharingGroups().size() * desiredParallelism); + } + + @Test + void testManualDownScaling() throws Exception { + final int initialRunningTasks = 2; + final int runningTasksAfterRescale = 1; + + final JobVertex jobVertex = new JobVertex("Operator"); + jobVertex.setParallelism(initialRunningTasks); + jobVertex.setInvokableClass(BlockingNoOpInvokable.class); + + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex); + + runRescalingTest( + jobGraph, + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(jobVertex.getID(), 1, runningTasksAfterRescale) + .build(), + initialRunningTasks, + runningTasksAfterRescale, + NUMBER_OF_SLOTS - runningTasksAfterRescale); + } + + private void runRescalingTest( + JobGraph jobGraph, + JobResourceRequirements newJobVertexParallelism, + int initialRunningTasks, + int runningTasksAfterRescale, + int freeSlotsAfterRescale) + throws Exception { + restClusterClient.submitJob(jobGraph).join(); + try { + final JobID jobId = jobGraph.getJobID(); + + waitForRunningTasks(restClusterClient, jobId, initialRunningTasks); + + restClusterClient.updateJobResourceRequirements(jobId, newJobVertexParallelism).join(); + + waitForRunningTasks(restClusterClient, jobId, runningTasksAfterRescale); + waitForAvailableSlots(restClusterClient, freeSlotsAfterRescale); + } finally { + restClusterClient.cancel(jobGraph.getJobID()).join(); + } + } + + private static int getNumberRunningTasks(RestClusterClient<?> restClusterClient, JobID jobId) { + final JobDetailsInfo jobDetailsInfo = restClusterClient.getJobDetails(jobId).join(); + return jobDetailsInfo.getJobVertexInfos().stream() + .map(JobDetailsInfo.JobVertexDetailsInfo::getTasksPerState) + .map(tasksPerState -> tasksPerState.get(ExecutionState.RUNNING)) + .mapToInt(Integer::intValue) + .sum(); + } + + public static void waitForRunningTasks( + RestClusterClient<?> restClusterClient, JobID jobId, int desiredNumberOfRunningTasks) + throws Exception { + CommonTestUtils.waitUntilCondition( + () -> { + final int numberOfRunningTasks = + getNumberRunningTasks(restClusterClient, jobId); + return numberOfRunningTasks == desiredNumberOfRunningTasks; + }); + } + + public static void waitForAvailableSlots( + RestClusterClient<?> restClusterClient, int desiredNumberOfAvailableSlots) + throws Exception { + CommonTestUtils.waitUntilCondition( + () -> { + final ClusterOverviewWithVersion clusterOverview = + restClusterClient.getClusterOverview().join(); + return clusterOverview.getNumSlotsAvailable() == desiredNumberOfAvailableSlots; + }); + } +}
