This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 024ed963ed8 [FLINK-34070][test] Adds dedicated tests in MiniClusterITCase for scenarios where there are not enough slots available. 024ed963ed8 is described below commit 024ed963ed8358ae78e728108ac6c95046924f67 Author: Matthias Pohl <matthias.p...@aiven.io> AuthorDate: Fri Jan 12 16:57:40 2024 +0100 [FLINK-34070][test] Adds dedicated tests in MiniClusterITCase for scenarios where there are not enough slots available. --- .../runtime/minicluster/MiniClusterITCase.java | 81 +++++++++++++++------- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index ba3ca9b5237..e527dda14c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -46,6 +47,7 @@ import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -100,43 +102,66 @@ class MiniClusterITCase { } @Test - void testHandleStreamingJobsWhenNotEnoughSlot() { - final JobVertex vertex1 = new JobVertex("Test Vertex1"); - vertex1.setParallelism(1); - vertex1.setMaxParallelism(1); - vertex1.setInvokableClass(BlockingNoOpInvokable.class); + void testHandlingNotEnoughSlotsThroughTimeout() throws Exception { + final Configuration config = new Configuration(); - final JobVertex vertex2 = new JobVertex("Test Vertex2"); - vertex2.setParallelism(1); - vertex2.setMaxParallelism(1); - vertex2.setInvokableClass(BlockingNoOpInvokable.class); + // the slot timeout needs to be high enough to avoid causing TimeoutException + final Duration slotRequestTimeout = Duration.ofMillis(100); - vertex2.connectNewDataSetAsInput( - vertex1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + // this triggers the failure for the default scheduler + config.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotRequestTimeout.toMillis()); + // this triggers the failure for the adaptive scheduler + config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, slotRequestTimeout); - final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1, vertex2); + // we have to disable sending the slot-unavailable request to allow for the timeout to kick + // in + config.set( + ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY, Duration.ofNanos(Long.MAX_VALUE)); - assertThatThrownBy(() -> runHandleJobsWhenNotEnoughSlots(jobGraph)) - .isInstanceOf(JobExecutionException.class) - .hasRootCauseInstanceOf(NoResourceAvailableException.class) - .hasMessageContaining("Job execution failed"); + tryRunningJobWithoutEnoughSlots(config); } - private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) throws Exception { - final Configuration configuration = new Configuration(); + @Test + // The AdaptiveScheduler is supposed to work with the resources that are available. + // That is why there is no resource allocation abort request supported. + @Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") + void testHandlingNotEnoughSlotsThroughEarlyAbortRequest() throws Exception { + final Configuration config = new Configuration(); // the slot timeout needs to be high enough to avoid causing TimeoutException - Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE); + final Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE); // this triggers the failure for the default scheduler - configuration.setLong( - JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotRequestTimeout.toMillis()); + config.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotRequestTimeout.toMillis()); // this triggers the failure for the adaptive scheduler - configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, slotRequestTimeout); + config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, slotRequestTimeout); + + // overwrite the default check delay to speed up the test execution + config.set(ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY, Duration.ofMillis(20)); // cluster startup relies on SLOT_REQUEST_TIMEOUT as a fallback if the following parameter // is not set which causes the test to take longer - configuration.set(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME, 1L); + config.set(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME, 1L); + + tryRunningJobWithoutEnoughSlots(config); + } + + private static void tryRunningJobWithoutEnoughSlots(Configuration configuration) + throws Exception { + final JobVertex vertex1 = new JobVertex("Test Vertex1"); + vertex1.setParallelism(1); + vertex1.setMaxParallelism(1); + vertex1.setInvokableClass(BlockingNoOpInvokable.class); + + final JobVertex vertex2 = new JobVertex("Test Vertex2"); + vertex2.setParallelism(1); + vertex2.setMaxParallelism(1); + vertex2.setInvokableClass(BlockingNoOpInvokable.class); + + vertex2.connectNewDataSetAsInput( + vertex1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1, vertex2); final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() @@ -149,7 +174,15 @@ class MiniClusterITCase { try (final MiniCluster miniCluster = new MiniCluster(cfg)) { miniCluster.start(); - miniCluster.executeJobBlocking(jobGraph); + assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph)) + .isInstanceOf(JobExecutionException.class) + .hasMessageContaining("Job execution failed") + .extracting(Throwable::getCause) + .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE) + .anySatisfy( + cause -> + assertThat(cause) + .isInstanceOf(NoResourceAvailableException.class)); } }