This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 621ce3248a927eab2886dd0c3e557fffc878be05 Author: Matthias Pohl <[email protected]> AuthorDate: Mon Sep 16 12:10:09 2024 +0200 [hotfix][test] Adds some additional logging to RescaleOnCheckpointITCase --- .../test/scheduling/RescaleOnCheckpointITCase.java | 38 +++++++++++++++++++--- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java index 60c2b19cca1..4febfe453cc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -40,6 +41,8 @@ import org.apache.flink.util.TestLoggerExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Iterator; @@ -51,6 +54,8 @@ import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(TestLoggerExtension.class) class RescaleOnCheckpointITCase { + private static final Logger LOG = LoggerFactory.getLogger(RescaleOnCheckpointITCase.class); + // Scaling down is used here because scaling up is not supported by the NumberSequenceSource // that's used in this test. private static final int NUMBER_OF_SLOTS = 4; @@ -111,34 +116,59 @@ class RescaleOnCheckpointITCase { assertThat(jobVertexIterator.hasNext()) .as("There needs to be at least one JobVertex.") .isTrue(); + final JobVertexID jobVertexId = jobVertexIterator.next().getID(); final JobResourceRequirements jobResourceRequirements = JobResourceRequirements.newBuilder() - .setParallelismForJobVertex( - jobVertexIterator.next().getID(), 1, AFTER_RESCALE_PARALLELISM) + .setParallelismForJobVertex(jobVertexId, 1, AFTER_RESCALE_PARALLELISM) .build(); assertThat(jobVertexIterator.hasNext()) .as("This test expects to have only one JobVertex.") .isFalse(); restClusterClient.submitJob(jobGraph).join(); + + final JobID jobId = jobGraph.getJobID(); try { - final JobID jobId = jobGraph.getJobID(); + LOG.info( + "Waiting for job {} to reach parallelism of {} for vertex {}.", + jobId, + BEFORE_RESCALE_PARALLELISM, + jobVertexId); waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM); + LOG.info( + "Job {} reached parallelism of {} for vertex {}. Updating the vertex parallelism next to {}.", + jobId, + BEFORE_RESCALE_PARALLELISM, + jobVertexId, + AFTER_RESCALE_PARALLELISM); restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join(); // timeout to allow any unexpected rescaling to happen anyway Thread.sleep(REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP.toMillis()); // verify that the previous timeout didn't result in a change of parallelism + LOG.info( + "Checking that job {} hasn't changed its parallelism even after some delay, yet.", + jobId); waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM); miniCluster.triggerCheckpoint(jobId); + LOG.info( + "Waiting for job {} to reach parallelism of {} for vertex {}.", + jobId, + AFTER_RESCALE_PARALLELISM, + jobVertexId); waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM); - waitForAvailableSlots(restClusterClient, NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM); + final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM; + LOG.info( + "Waiting for {} slot(s) to become available due to the scale down.", + expectedFreeSlotCount); + waitForAvailableSlots(restClusterClient, expectedFreeSlotCount); + LOG.info("{} free slot(s) detected. Finishing test.", expectedFreeSlotCount); } finally { restClusterClient.cancel(jobGraph.getJobID()).join(); }
