This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 024ed963ed8 [FLINK-34070][test] Adds dedicated tests in 
MiniClusterITCase for scenarios where there are not enough slots available.
024ed963ed8 is described below

commit 024ed963ed8358ae78e728108ac6c95046924f67
Author: Matthias Pohl <matthias.p...@aiven.io>
AuthorDate: Fri Jan 12 16:57:40 2024 +0100

    [FLINK-34070][test] Adds dedicated tests in MiniClusterITCase for scenarios 
where there are not enough slots available.
---
 .../runtime/minicluster/MiniClusterITCase.java     | 81 +++++++++++++++-------
 1 file changed, 57 insertions(+), 24 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index ba3ca9b5237..e527dda14c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -46,6 +47,7 @@ import 
org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
 
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -100,43 +102,66 @@ class MiniClusterITCase {
     }
 
     @Test
-    void testHandleStreamingJobsWhenNotEnoughSlot() {
-        final JobVertex vertex1 = new JobVertex("Test Vertex1");
-        vertex1.setParallelism(1);
-        vertex1.setMaxParallelism(1);
-        vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+    void testHandlingNotEnoughSlotsThroughTimeout() throws Exception {
+        final Configuration config = new Configuration();
 
-        final JobVertex vertex2 = new JobVertex("Test Vertex2");
-        vertex2.setParallelism(1);
-        vertex2.setMaxParallelism(1);
-        vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+        // the slot timeout needs to be high enough to avoid causing 
TimeoutException
+        final Duration slotRequestTimeout = Duration.ofMillis(100);
 
-        vertex2.connectNewDataSetAsInput(
-                vertex1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+        // this triggers the failure for the default scheduler
+        config.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 
slotRequestTimeout.toMillis());
+        // this triggers the failure for the adaptive scheduler
+        config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
slotRequestTimeout);
 
-        final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1, 
vertex2);
+        // we have to disable sending the slot-unavailable request to allow 
for the timeout to kick
+        // in
+        config.set(
+                ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY, 
Duration.ofNanos(Long.MAX_VALUE));
 
-        assertThatThrownBy(() -> runHandleJobsWhenNotEnoughSlots(jobGraph))
-                .isInstanceOf(JobExecutionException.class)
-                .hasRootCauseInstanceOf(NoResourceAvailableException.class)
-                .hasMessageContaining("Job execution failed");
+        tryRunningJobWithoutEnoughSlots(config);
     }
 
-    private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) 
throws Exception {
-        final Configuration configuration = new Configuration();
+    @Test
+    // The AdaptiveScheduler is supposed to work with the resources that are 
available.
+    // That is why there is no resource allocation abort request supported.
+    @Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
+    void testHandlingNotEnoughSlotsThroughEarlyAbortRequest() throws Exception 
{
+        final Configuration config = new Configuration();
 
         // the slot timeout needs to be high enough to avoid causing 
TimeoutException
-        Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE);
+        final Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE);
 
         // this triggers the failure for the default scheduler
-        configuration.setLong(
-                JobManagerOptions.SLOT_REQUEST_TIMEOUT, 
slotRequestTimeout.toMillis());
+        config.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 
slotRequestTimeout.toMillis());
         // this triggers the failure for the adaptive scheduler
-        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
slotRequestTimeout);
+        config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
slotRequestTimeout);
+
+        // overwrite the default check delay to speed up the test execution
+        config.set(ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY, 
Duration.ofMillis(20));
 
         // cluster startup relies on SLOT_REQUEST_TIMEOUT as a fallback if the 
following parameter
         // is not set which causes the test to take longer
-        
configuration.set(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME,
 1L);
+        
config.set(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME, 1L);
+
+        tryRunningJobWithoutEnoughSlots(config);
+    }
+
+    private static void tryRunningJobWithoutEnoughSlots(Configuration 
configuration)
+            throws Exception {
+        final JobVertex vertex1 = new JobVertex("Test Vertex1");
+        vertex1.setParallelism(1);
+        vertex1.setMaxParallelism(1);
+        vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+
+        final JobVertex vertex2 = new JobVertex("Test Vertex2");
+        vertex2.setParallelism(1);
+        vertex2.setMaxParallelism(1);
+        vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+        vertex2.connectNewDataSetAsInput(
+                vertex1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+
+        final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1, 
vertex2);
 
         final MiniClusterConfiguration cfg =
                 new MiniClusterConfiguration.Builder()
@@ -149,7 +174,15 @@ class MiniClusterITCase {
         try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
             miniCluster.start();
 
-            miniCluster.executeJobBlocking(jobGraph);
+            assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph))
+                    .isInstanceOf(JobExecutionException.class)
+                    .hasMessageContaining("Job execution failed")
+                    .extracting(Throwable::getCause)
+                    .extracting(FlinkAssertions::chainOfCauses, 
FlinkAssertions.STREAM_THROWABLE)
+                    .anySatisfy(
+                            cause ->
+                                    assertThat(cause)
+                                            
.isInstanceOf(NoResourceAvailableException.class));
         }
     }
 

Reply via email to