This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 621ce3248a927eab2886dd0c3e557fffc878be05
Author: Matthias Pohl <[email protected]>
AuthorDate: Mon Sep 16 12:10:09 2024 +0200

    [hotfix][test] Adds some additional logging to RescaleOnCheckpointITCase
---
 .../test/scheduling/RescaleOnCheckpointITCase.java | 38 +++++++++++++++++++---
 1 file changed, 34 insertions(+), 4 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
index 60c2b19cca1..4febfe453cc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -40,6 +41,8 @@ import org.apache.flink.util.TestLoggerExtension;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.Iterator;
@@ -51,6 +54,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 @ExtendWith(TestLoggerExtension.class)
 class RescaleOnCheckpointITCase {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(RescaleOnCheckpointITCase.class);
+
     // Scaling down is used here because scaling up is not supported by the 
NumberSequenceSource
     // that's used in this test.
     private static final int NUMBER_OF_SLOTS = 4;
@@ -111,34 +116,59 @@ class RescaleOnCheckpointITCase {
         assertThat(jobVertexIterator.hasNext())
                 .as("There needs to be at least one JobVertex.")
                 .isTrue();
+        final JobVertexID jobVertexId = jobVertexIterator.next().getID();
         final JobResourceRequirements jobResourceRequirements =
                 JobResourceRequirements.newBuilder()
-                        .setParallelismForJobVertex(
-                                jobVertexIterator.next().getID(), 1, 
AFTER_RESCALE_PARALLELISM)
+                        .setParallelismForJobVertex(jobVertexId, 1, 
AFTER_RESCALE_PARALLELISM)
                         .build();
         assertThat(jobVertexIterator.hasNext())
                 .as("This test expects to have only one JobVertex.")
                 .isFalse();
 
         restClusterClient.submitJob(jobGraph).join();
+
+        final JobID jobId = jobGraph.getJobID();
         try {
-            final JobID jobId = jobGraph.getJobID();
 
+            LOG.info(
+                    "Waiting for job {} to reach parallelism of {} for vertex 
{}.",
+                    jobId,
+                    BEFORE_RESCALE_PARALLELISM,
+                    jobVertexId);
             waitForRunningTasks(restClusterClient, jobId, 
BEFORE_RESCALE_PARALLELISM);
 
+            LOG.info(
+                    "Job {} reached parallelism of {} for vertex {}. Updating 
the vertex parallelism next to {}.",
+                    jobId,
+                    BEFORE_RESCALE_PARALLELISM,
+                    jobVertexId,
+                    AFTER_RESCALE_PARALLELISM);
             restClusterClient.updateJobResourceRequirements(jobId, 
jobResourceRequirements).join();
 
             // timeout to allow any unexpected rescaling to happen anyway
             Thread.sleep(REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP.toMillis());
 
             // verify that the previous timeout didn't result in a change of 
parallelism
+            LOG.info(
+                    "Checking that job {} hasn't changed its parallelism even 
after some delay, yet.",
+                    jobId);
             waitForRunningTasks(restClusterClient, jobId, 
BEFORE_RESCALE_PARALLELISM);
 
             miniCluster.triggerCheckpoint(jobId);
 
+            LOG.info(
+                    "Waiting for job {} to reach parallelism of {} for vertex 
{}.",
+                    jobId,
+                    AFTER_RESCALE_PARALLELISM,
+                    jobVertexId);
             waitForRunningTasks(restClusterClient, jobId, 
AFTER_RESCALE_PARALLELISM);
 
-            waitForAvailableSlots(restClusterClient, NUMBER_OF_SLOTS - 
AFTER_RESCALE_PARALLELISM);
+            final int expectedFreeSlotCount = NUMBER_OF_SLOTS - 
AFTER_RESCALE_PARALLELISM;
+            LOG.info(
+                    "Waiting for {} slot(s) to become available due to the 
scale down.",
+                    expectedFreeSlotCount);
+            waitForAvailableSlots(restClusterClient, expectedFreeSlotCount);
+            LOG.info("{} free slot(s) detected. Finishing test.", 
expectedFreeSlotCount);
         } finally {
             restClusterClient.cancel(jobGraph.getJobID()).join();
         }

Reply via email to