This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 024ed963ed8 [FLINK-34070][test] Adds dedicated tests in
MiniClusterITCase for scenarios where there are not enough slots available.
024ed963ed8 is described below
commit 024ed963ed8358ae78e728108ac6c95046924f67
Author: Matthias Pohl <[email protected]>
AuthorDate: Fri Jan 12 16:57:40 2024 +0100
[FLINK-34070][test] Adds dedicated tests in MiniClusterITCase for scenarios
where there are not enough slots available.
---
.../runtime/minicluster/MiniClusterITCase.java | 81 +++++++++++++++-------
1 file changed, 57 insertions(+), 24 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index ba3ca9b5237..e527dda14c0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -24,6 +24,7 @@ import
org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -46,6 +47,7 @@ import
org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -100,43 +102,66 @@ class MiniClusterITCase {
}
@Test
- void testHandleStreamingJobsWhenNotEnoughSlot() {
- final JobVertex vertex1 = new JobVertex("Test Vertex1");
- vertex1.setParallelism(1);
- vertex1.setMaxParallelism(1);
- vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+ void testHandlingNotEnoughSlotsThroughTimeout() throws Exception {
+ final Configuration config = new Configuration();
- final JobVertex vertex2 = new JobVertex("Test Vertex2");
- vertex2.setParallelism(1);
- vertex2.setMaxParallelism(1);
- vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+ // the slot timeout needs to be high enough to avoid causing
TimeoutException
+ final Duration slotRequestTimeout = Duration.ofMillis(100);
- vertex2.connectNewDataSetAsInput(
- vertex1, DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED);
+ // this triggers the failure for the default scheduler
+ config.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT,
slotRequestTimeout.toMillis());
+ // this triggers the failure for the adaptive scheduler
+ config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
slotRequestTimeout);
- final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1,
vertex2);
+ // we have to disable sending the slot-unavailable request to allow
for the timeout to kick
+ // in
+ config.set(
+ ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY,
Duration.ofNanos(Long.MAX_VALUE));
- assertThatThrownBy(() -> runHandleJobsWhenNotEnoughSlots(jobGraph))
- .isInstanceOf(JobExecutionException.class)
- .hasRootCauseInstanceOf(NoResourceAvailableException.class)
- .hasMessageContaining("Job execution failed");
+ tryRunningJobWithoutEnoughSlots(config);
}
- private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph)
throws Exception {
- final Configuration configuration = new Configuration();
+ @Test
+ // The AdaptiveScheduler is supposed to work with the resources that are
available.
+ // That is why there is no resource allocation abort request supported.
+ @Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
+ void testHandlingNotEnoughSlotsThroughEarlyAbortRequest() throws Exception
{
+ final Configuration config = new Configuration();
// the slot timeout needs to be high enough to avoid causing
TimeoutException
- Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE);
+ final Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE);
// this triggers the failure for the default scheduler
- configuration.setLong(
- JobManagerOptions.SLOT_REQUEST_TIMEOUT,
slotRequestTimeout.toMillis());
+ config.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT,
slotRequestTimeout.toMillis());
// this triggers the failure for the adaptive scheduler
- configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
slotRequestTimeout);
+ config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
slotRequestTimeout);
+
+ // overwrite the default check delay to speed up the test execution
+ config.set(ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY,
Duration.ofMillis(20));
// cluster startup relies on SLOT_REQUEST_TIMEOUT as a fallback if the
following parameter
// is not set which causes the test to take longer
-
configuration.set(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME,
1L);
+
config.set(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME, 1L);
+
+ tryRunningJobWithoutEnoughSlots(config);
+ }
+
+ private static void tryRunningJobWithoutEnoughSlots(Configuration
configuration)
+ throws Exception {
+ final JobVertex vertex1 = new JobVertex("Test Vertex1");
+ vertex1.setParallelism(1);
+ vertex1.setMaxParallelism(1);
+ vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobVertex vertex2 = new JobVertex("Test Vertex2");
+ vertex2.setParallelism(1);
+ vertex2.setMaxParallelism(1);
+ vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+ vertex2.connectNewDataSetAsInput(
+ vertex1, DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED);
+
+ final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1,
vertex2);
final MiniClusterConfiguration cfg =
new MiniClusterConfiguration.Builder()
@@ -149,7 +174,15 @@ class MiniClusterITCase {
try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
miniCluster.start();
- miniCluster.executeJobBlocking(jobGraph);
+ assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph))
+ .isInstanceOf(JobExecutionException.class)
+ .hasMessageContaining("Job execution failed")
+ .extracting(Throwable::getCause)
+ .extracting(FlinkAssertions::chainOfCauses,
FlinkAssertions.STREAM_THROWABLE)
+ .anySatisfy(
+ cause ->
+ assertThat(cause)
+
.isInstanceOf(NoResourceAvailableException.class));
}
}