This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4dd0912bb8a809b533fe15665899dc1638253e4d Author: 1996fanrui <[email protected]> AuthorDate: Mon Aug 14 22:10:50 2023 +0800 [hotfix][JUnit5 Migration] Migrate some state tests of adaptive scheduler to junit5 --- .../runtime/scheduler/adaptive/CreatedTest.java | 115 +++--- .../adaptive/CreatingExecutionGraphTest.java | 316 +++++++-------- .../runtime/scheduler/adaptive/StateTest.java | 96 ++--- .../adaptive/WaitingForResourcesTest.java | 430 ++++++++++----------- 4 files changed, 474 insertions(+), 483 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java index ee1ee2b7c1d..302fb471c3e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java @@ -23,82 +23,97 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.failure.FailureEnricherUtils; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.FlinkException; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.function.Consumer; -import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link Created} state. */ -public class CreatedTest extends TestLogger { +class CreatedTest { - @Test - public void testCancel() throws Exception { - try (MockCreatedContext ctx = new MockCreatedContext()) { - Created created = new Created(ctx, log); + private static final Logger LOG = LoggerFactory.getLogger(CreatedTest.class); - ctx.setExpectFinished(assertNonNull()); + @RegisterExtension MockCreatedContext ctx = new MockCreatedContext(); - created.cancel(); - } + @Test + void testCancel() { + Created created = new Created(ctx, LOG); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNull(); + }); + created.cancel(); } @Test - public void testStartScheduling() throws Exception { - try (MockCreatedContext ctx = new MockCreatedContext()) { - Created created = new Created(ctx, log); + void testStartScheduling() { + Created created = new Created(ctx, LOG); - ctx.setExpectWaitingForResources(); + ctx.setExpectWaitingForResources(); - created.startScheduling(); - } + created.startScheduling(); } @Test - public void testSuspend() throws Exception { - try (MockCreatedContext ctx = new MockCreatedContext()) { - Created created = new Created(ctx, log); - - ctx.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); - }); - - created.suspend(new RuntimeException("Suspend")); - } + void testSuspend() { + FlinkException expectedException = new FlinkException("This is a test exception"); + Created created = new Created(ctx, LOG); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); + assertThat( + archivedExecutionGraph + .getFailureInfo() + .getException() + .deserializeError(this.getClass().getClassLoader())) + .isEqualTo(expectedException); + }); + + created.suspend(expectedException); } @Test - public void testFailure() throws Exception { - try (MockCreatedContext ctx = new MockCreatedContext()) { - Created created = new Created(ctx, log); - - ctx.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); - }); - - created.handleGlobalFailure( - new RuntimeException("Global"), FailureEnricherUtils.EMPTY_FAILURE_LABELS); - } + void testFailure() { + Created created = new Created(ctx, LOG); + RuntimeException expectedException = new RuntimeException("This is a test exception"); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); + assertThat( + archivedExecutionGraph + .getFailureInfo() + .getException() + .deserializeError(this.getClass().getClassLoader())) + .isEqualTo(expectedException); + }); + + created.handleGlobalFailure(expectedException, FailureEnricherUtils.EMPTY_FAILURE_LABELS); } @Test - public void testJobInformation() throws Exception { - try (MockCreatedContext ctx = new MockCreatedContext()) { - Created created = new Created(ctx, log); - ArchivedExecutionGraph job = created.getJob(); - assertThat(job.getState(), is(JobStatus.INITIALIZING)); - } + void testJobInformation() { + Created created = new Created(ctx, LOG); + ArchivedExecutionGraph job = created.getJob(); + assertThat(job.getState()).isEqualTo(JobStatus.INITIALIZING); } - static class MockCreatedContext implements Created.Context, AutoCloseable { + static class MockCreatedContext implements Created.Context, AfterEachCallback { private final StateValidator<ArchivedExecutionGraph> finishedStateValidator = new StateValidator<>("finished"); private final StateValidator<Void> waitingForResourcesStateValidator = @@ -130,7 +145,7 @@ public class CreatedTest extends TestLogger { } @Override - public void close() throws Exception { + public void afterEach(ExtensionContext extensionContext) throws Exception { finishedStateValidator.close(); waitingForResourcesStateValidator.close(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java index 2375a194206..833835b7d22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java @@ -33,12 +33,14 @@ import org.apache.flink.runtime.scheduler.GlobalFailureHandler; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.TestLogger; -import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.concurrent.Executors; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -54,167 +56,171 @@ import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link CreatingExecutionGraph} state. */ -@ExtendWith(TestLoggerExtension.class) -public class CreatingExecutionGraphTest extends TestLogger { +class CreatingExecutionGraphTest { + + private static final Logger LOG = LoggerFactory.getLogger(CreatingExecutionGraphTest.class); + + @RegisterExtension + MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext(); @Test - public void testCancelTransitionsToFinished() throws Exception { - try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { - final CreatingExecutionGraph creatingExecutionGraph = - new CreatingExecutionGraph( - context, - new CompletableFuture<>(), - log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, - null); - - context.setExpectFinished( - archivedExecutionGraph -> - assertThat(archivedExecutionGraph.getState()) - .isEqualTo(JobStatus.CANCELED)); - - creatingExecutionGraph.cancel(); - } + void testCancelTransitionsToFinished() { + final CreatingExecutionGraph creatingExecutionGraph = + new CreatingExecutionGraph( + context, + new CompletableFuture<>(), + LOG, + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); + + context.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNull(); + }); + creatingExecutionGraph.cancel(); } @Test - public void testSuspendTransitionsToFinished() throws Exception { - try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { - final CreatingExecutionGraph creatingExecutionGraph = - new CreatingExecutionGraph( - context, - new CompletableFuture<>(), - log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, - null); - - context.setExpectFinished( - archivedExecutionGraph -> - assertThat(archivedExecutionGraph.getState()) - .isEqualTo(JobStatus.SUSPENDED)); - - creatingExecutionGraph.suspend(new FlinkException("Job has been suspended.")); - } + void testSuspendTransitionsToFinished() { + final CreatingExecutionGraph creatingExecutionGraph = + new CreatingExecutionGraph( + context, + new CompletableFuture<>(), + LOG, + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); + + FlinkException expectedException = new FlinkException("This is a test exception"); + + context.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); + assertThat( + archivedExecutionGraph + .getFailureInfo() + .getException() + .deserializeError(this.getClass().getClassLoader())) + .isEqualTo(expectedException); + }); + + creatingExecutionGraph.suspend(expectedException); } @Test - public void testGlobalFailureTransitionsToFinished() throws Exception { - try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { - final CreatingExecutionGraph creatingExecutionGraph = - new CreatingExecutionGraph( - context, - new CompletableFuture<>(), - log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, - null); - - context.setExpectFinished( - archivedExecutionGraph -> - assertThat(archivedExecutionGraph.getState()) - .isEqualTo(JobStatus.FAILED)); - - creatingExecutionGraph.handleGlobalFailure( - new FlinkException("Test exception"), - FailureEnricherUtils.EMPTY_FAILURE_LABELS); - } + void testGlobalFailureTransitionsToFinished() { + final CreatingExecutionGraph creatingExecutionGraph = + new CreatingExecutionGraph( + context, + new CompletableFuture<>(), + LOG, + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); + + RuntimeException expectedException = new RuntimeException("This is a test exception"); + + context.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); + assertThat( + archivedExecutionGraph + .getFailureInfo() + .getException() + .deserializeError(this.getClass().getClassLoader())) + .isEqualTo(expectedException); + }); + + creatingExecutionGraph.handleGlobalFailure( + expectedException, FailureEnricherUtils.EMPTY_FAILURE_LABELS); } @Test - public void testFailedExecutionGraphCreationTransitionsToFinished() throws Exception { - try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { - final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> - executionGraphWithVertexParallelismFuture = new CompletableFuture<>(); - new CreatingExecutionGraph( - context, - executionGraphWithVertexParallelismFuture, - log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, - null); - - context.setExpectFinished( - archivedExecutionGraph -> - assertThat(archivedExecutionGraph.getState()) - .isEqualTo(JobStatus.FAILED)); - - executionGraphWithVertexParallelismFuture.completeExceptionally( - new FlinkException("Test exception")); - } + void testFailedExecutionGraphCreationTransitionsToFinished() { + final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> + executionGraphWithVertexParallelismFuture = new CompletableFuture<>(); + new CreatingExecutionGraph( + context, + executionGraphWithVertexParallelismFuture, + LOG, + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); + + context.setExpectFinished( + archivedExecutionGraph -> + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED)); + + executionGraphWithVertexParallelismFuture.completeExceptionally( + new FlinkException("Test exception")); } @Test - public void testNotPossibleSlotAssignmentTransitionsToWaitingForResources() throws Exception { - try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { - final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> - executionGraphWithVertexParallelismFuture = new CompletableFuture<>(); - new CreatingExecutionGraph( - context, - executionGraphWithVertexParallelismFuture, - log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, - null); - - context.setTryToAssignSlotsFunction( - ignored -> CreatingExecutionGraph.AssignmentResult.notPossible()); - context.setExpectWaitingForResources(); - - executionGraphWithVertexParallelismFuture.complete( - getGraph(new StateTrackingMockExecutionGraph())); - } + void testNotPossibleSlotAssignmentTransitionsToWaitingForResources() { + final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> + executionGraphWithVertexParallelismFuture = new CompletableFuture<>(); + new CreatingExecutionGraph( + context, + executionGraphWithVertexParallelismFuture, + LOG, + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); + + context.setTryToAssignSlotsFunction( + ignored -> CreatingExecutionGraph.AssignmentResult.notPossible()); + context.setExpectWaitingForResources(); + + executionGraphWithVertexParallelismFuture.complete( + getGraph(new StateTrackingMockExecutionGraph())); } @Test - public void testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exception { - try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { - final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> - executionGraphWithVertexParallelismFuture = new CompletableFuture<>(); - new CreatingExecutionGraph( - context, - executionGraphWithVertexParallelismFuture, - log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, - null); - - final StateTrackingMockExecutionGraph executionGraph = - new StateTrackingMockExecutionGraph(); - - context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment); - context.setExpectedExecuting( - actualExecutionGraph -> - assertThat(actualExecutionGraph).isEqualTo(executionGraph)); - - executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph)); - } + void testSuccessfulSlotAssignmentTransitionsToExecuting() { + final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> + executionGraphWithVertexParallelismFuture = new CompletableFuture<>(); + new CreatingExecutionGraph( + context, + executionGraphWithVertexParallelismFuture, + LOG, + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); + + final StateTrackingMockExecutionGraph executionGraph = + new StateTrackingMockExecutionGraph(); + + context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment); + context.setExpectedExecuting( + actualExecutionGraph -> assertThat(actualExecutionGraph).isEqualTo(executionGraph)); + + executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph)); } @Test - public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState() throws Exception { - try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { - final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> - executionGraphWithVertexParallelismFuture = new CompletableFuture<>(); - final AtomicReference<GlobalFailureHandler> operatorCoordinatorGlobalFailureHandlerRef = - new AtomicReference<>(); - new CreatingExecutionGraph( - context, - executionGraphWithVertexParallelismFuture, - log, - (executionGraph, errorHandler) -> { - operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler); - return new TestingOperatorCoordinatorHandler(); - }, - null); - - final StateTrackingMockExecutionGraph executionGraph = - new StateTrackingMockExecutionGraph(); - - context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment); - context.setExpectedExecuting( - actualExecutionGraph -> - assertThat(actualExecutionGraph).isEqualTo(executionGraph)); - - executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph)); - - assertThat(operatorCoordinatorGlobalFailureHandlerRef.get()).isSameAs(context); - } + void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState() { + final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> + executionGraphWithVertexParallelismFuture = new CompletableFuture<>(); + final AtomicReference<GlobalFailureHandler> operatorCoordinatorGlobalFailureHandlerRef = + new AtomicReference<>(); + new CreatingExecutionGraph( + context, + executionGraphWithVertexParallelismFuture, + LOG, + (executionGraph, errorHandler) -> { + operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler); + return new TestingOperatorCoordinatorHandler(); + }, + null); + + final StateTrackingMockExecutionGraph executionGraph = + new StateTrackingMockExecutionGraph(); + + context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment); + context.setExpectedExecuting( + actualExecutionGraph -> assertThat(actualExecutionGraph).isEqualTo(executionGraph)); + + executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph)); + + assertThat(operatorCoordinatorGlobalFailureHandlerRef.get()).isSameAs(context); } private static CreatingExecutionGraph.AssignmentResult successfulAssignment( @@ -230,7 +236,7 @@ public class CreatingExecutionGraphTest extends TestLogger { } static class MockCreatingExecutionGraphContext - implements CreatingExecutionGraph.Context, AutoCloseable { + implements CreatingExecutionGraph.Context, AfterEachCallback { private final StateValidator<ArchivedExecutionGraph> finishedStateValidator = new StateValidator<>("Finished"); private final StateValidator<Void> waitingForResourcesStateValidator = @@ -249,7 +255,7 @@ public class CreatingExecutionGraphTest extends TestLogger { // No-op. }; - private boolean hadStateTransitionHappened = false; + private boolean hasStateTransition = false; public void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) { finishedStateValidator.expectInput(asserter); @@ -278,7 +284,7 @@ public class CreatingExecutionGraphTest extends TestLogger { @Override public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) { finishedStateValidator.validateInput(archivedExecutionGraph); - hadStateTransitionHappened = true; + registerStateTransition(); } @Override @@ -288,7 +294,7 @@ public class CreatingExecutionGraphTest extends TestLogger { OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection) { executingStateValidator.validateInput(executionGraph); - hadStateTransitionHappened = true; + registerStateTransition(); } @Override @@ -300,7 +306,7 @@ public class CreatingExecutionGraphTest extends TestLogger { @Override public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) { - if (!hadStateTransitionHappened) { + if (!hasStateTransition()) { action.run(); } @@ -322,7 +328,7 @@ public class CreatingExecutionGraphTest extends TestLogger { @Override public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { waitingForResourcesStateValidator.validateInput(null); - hadStateTransitionHappened = true; + hasStateTransition = true; } @Override @@ -341,11 +347,19 @@ public class CreatingExecutionGraphTest extends TestLogger { } @Override - public void close() throws Exception { + public void afterEach(ExtensionContext extensionContext) throws Exception { finishedStateValidator.close(); waitingForResourcesStateValidator.close(); executingStateValidator.close(); } + + public boolean hasStateTransition() { + return hasStateTransition; + } + + public void registerStateTransition() { + hasStateTransition = true; + } } private static CreatingExecutionGraph.ExecutionGraphWithVertexParallelism getGraph( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTest.java index fc3b0f6731c..568ed8a2c6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTest.java @@ -18,85 +18,71 @@ package org.apache.flink.runtime.scheduler.adaptive; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** * Tests for the default methods on the {@link State} interface, based on the {@link Created} state, * as it is a simple state. */ -public class StateTest extends TestLogger { +class StateTest { + + private static final Logger LOG = LoggerFactory.getLogger(StateTest.class); + + @RegisterExtension CreatedTest.MockCreatedContext ctx = new CreatedTest.MockCreatedContext(); + @Test - public void testEmptyAs() throws Exception { - try (CreatedTest.MockCreatedContext ctx = new CreatedTest.MockCreatedContext()) { - State state = new Created(ctx, log); - assertThat(state.as(WaitingForResources.class), is(Optional.empty())); - } + void testEmptyAs() { + State state = new Created(ctx, LOG); + assertThat(state.as(WaitingForResources.class)).isEmpty(); } @Test - public void testCast() throws Exception { - try (CreatedTest.MockCreatedContext ctx = new CreatedTest.MockCreatedContext()) { - State state = new Created(ctx, log); - assertThat(state.as(Created.class), is(Optional.of(state))); - } + void testCast() { + Created state = new Created(ctx, LOG); + assertThat(state.as(Created.class)).hasValue(state); } @Test - public void testTryRunStateMismatch() throws Exception { - try (CreatedTest.MockCreatedContext ctx = new CreatedTest.MockCreatedContext()) { - State state = new Created(ctx, log); - state.tryRun( - WaitingForResources.class, (waiting -> fail("Unexpected execution")), "test"); - } + void testTryRunStateMismatch() { + State state = new Created(ctx, LOG); + state.tryRun(WaitingForResources.class, waiting -> fail("Unexpected execution"), "test"); } @Test - public void testTryRun() throws Exception { - try (CreatedTest.MockCreatedContext ctx = new CreatedTest.MockCreatedContext()) { - State state = new Created(ctx, log); - AtomicBoolean called = new AtomicBoolean(false); - state.tryRun(Created.class, created -> called.set(true), "test"); - assertThat(called.get(), is(true)); - } + void testTryRun() { + State state = new Created(ctx, LOG); + AtomicBoolean called = new AtomicBoolean(false); + state.tryRun(Created.class, created -> called.set(true), "test"); + assertThat(called).isTrue(); } @Test - public void testTryCallStateMismatch() throws Exception { - try (CreatedTest.MockCreatedContext ctx = new CreatedTest.MockCreatedContext()) { - State state = new Created(ctx, log); - Optional<String> result = - state.tryCall( - WaitingForResources.class, - Waiting -> { - fail("Unexpected execution"); - return "nope"; - }, - "test"); - assertThat(result, is(Optional.empty())); - } + void testTryCallStateMismatch() { + State state = new Created(ctx, LOG); + Optional<String> result = + state.tryCall( + WaitingForResources.class, + Waiting -> { + fail("Unexpected execution"); + return "nope"; + }, + "test"); + assertThat(result).isEmpty(); } @Test - public void testTryCall() throws Exception { - try (CreatedTest.MockCreatedContext ctx = new CreatedTest.MockCreatedContext()) { - State state = new Created(ctx, log); - Optional<String> result = - state.tryCall( - Created.class, - created -> { - return "yes"; - }, - "test"); - assertThat(result, is(Optional.of("yes"))); - } + void testTryCall() { + State state = new Created(ctx, LOG); + Optional<String> result = state.tryCall(Created.class, created -> "yes", "test"); + assertThat(result).hasValue("yes"); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java index 2f60b4aeac5..bdcc73f5c9d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java @@ -25,11 +25,14 @@ import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.failure.FailureEnricherUtils; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.ManualClock; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,256 +48,234 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Supplier; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.Matchers.greaterThan; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** Tests for the WaitingForResources state. */ -public class WaitingForResourcesTest extends TestLogger { +class WaitingForResourcesTest { + + private static final Logger LOG = LoggerFactory.getLogger(WaitingForResourcesTest.class); + private static final Duration STABILIZATION_TIMEOUT = Duration.ofSeconds(1); + @RegisterExtension MockContext ctx = new MockContext(); + /** WaitingForResources is transitioning to Executing if there are enough resources. */ @Test - public void testTransitionToCreatingExecutionGraph() throws Exception { - try (MockContext ctx = new MockContext()) { - ctx.setHasDesiredResources(() -> true); + void testTransitionToCreatingExecutionGraph() { + ctx.setHasDesiredResources(() -> true); - ctx.setExpectCreatingExecutionGraph(); + ctx.setExpectCreatingExecutionGraph(); - new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); + new WaitingForResources(ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT); - ctx.runScheduledTasks(); - } + ctx.runScheduledTasks(); } @Test - public void testNotEnoughResources() throws Exception { - try (MockContext ctx = new MockContext()) { - ctx.setHasDesiredResources(() -> false); - WaitingForResources wfr = - new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); - - // we expect no state transition. - wfr.onNewResourcesAvailable(); - } + void testNotEnoughResources() { + ctx.setHasDesiredResources(() -> false); + WaitingForResources wfr = + new WaitingForResources(ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT); + + // we expect no state transition. + wfr.onNewResourcesAvailable(); } @Test - public void testNotifyNewResourcesAvailable() throws Exception { - try (MockContext ctx = new MockContext()) { - ctx.setHasDesiredResources(() -> false); // initially, not enough resources - WaitingForResources wfr = - new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); - ctx.setHasDesiredResources(() -> true); // make resources available - ctx.setExpectCreatingExecutionGraph(); - wfr.onNewResourcesAvailable(); // .. and notify - } + void testNotifyNewResourcesAvailable() { + ctx.setHasDesiredResources(() -> false); // initially, not enough resources + WaitingForResources wfr = + new WaitingForResources(ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT); + ctx.setHasDesiredResources(() -> true); // make resources available + ctx.setExpectCreatingExecutionGraph(); + wfr.onNewResourcesAvailable(); // .. and notify } @Test - public void testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws Exception { - try (MockContext ctx = new MockContext()) { - - Duration noStabilizationTimeout = Duration.ofMillis(0); - WaitingForResources wfr = - new WaitingForResources( - ctx, log, Duration.ofSeconds(1000), noStabilizationTimeout); - - ctx.setHasDesiredResources(() -> false); - ctx.setHasSufficientResources(() -> true); - ctx.setExpectCreatingExecutionGraph(); - wfr.onNewResourcesAvailable(); - } + void testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() { + Duration noStabilizationTimeout = Duration.ofMillis(0); + WaitingForResources wfr = + new WaitingForResources(ctx, LOG, Duration.ofSeconds(1000), noStabilizationTimeout); + + ctx.setHasDesiredResources(() -> false); + ctx.setHasSufficientResources(() -> true); + ctx.setExpectCreatingExecutionGraph(); + wfr.onNewResourcesAvailable(); } @Test - public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws Exception { - try (MockContext ctx = new MockContext()) { + void testNoSchedulingIfStabilizationTimeoutIsConfigured() { + Duration stabilizationTimeout = Duration.ofMillis(50000); - Duration stabilizationTimeout = Duration.ofMillis(50000); + WaitingForResources wfr = + new WaitingForResources(ctx, LOG, Duration.ofSeconds(1000), stabilizationTimeout); - WaitingForResources wfr = - new WaitingForResources( - ctx, log, Duration.ofSeconds(1000), stabilizationTimeout); + ctx.setHasDesiredResources(() -> false); + ctx.setHasSufficientResources(() -> true); + wfr.onNewResourcesAvailable(); + // we are not triggering the scheduled tasks, to simulate a long stabilization timeout - ctx.setHasDesiredResources(() -> false); - ctx.setHasSufficientResources(() -> true); - wfr.onNewResourcesAvailable(); - // we are not triggering the scheduled tasks, to simulate a long stabilization timeout - - assertThat(ctx.hasStateTransition(), is(false)); - } + assertThat(ctx.hasStateTransition()).isFalse(); } @Test - public void testSchedulingWithSufficientResourcesAfterStabilizationTimeout() throws Exception { - try (MockContext ctx = new MockContext()) { - - Duration initialResourceTimeout = Duration.ofMillis(-1); - Duration stabilizationTimeout = Duration.ofMillis(50_000L); - - WaitingForResources wfr = - new WaitingForResources( - ctx, - log, - initialResourceTimeout, - stabilizationTimeout, - ctx.getClock(), - null); - // sufficient resources available - ctx.setHasDesiredResources(() -> false); - ctx.setHasSufficientResources(() -> true); - - // notify about sufficient resources - wfr.onNewResourcesAvailable(); - - ctx.setExpectCreatingExecutionGraph(); - - // execute all runnables and trigger expected state transition - final Duration afterStabilizationTimeout = stabilizationTimeout.plusMillis(1); - ctx.advanceTimeByMillis(afterStabilizationTimeout.toMillis()); - - ctx.runScheduledTasks(afterStabilizationTimeout.toMillis()); - - assertThat(ctx.hasStateTransition(), is(true)); - } + void testSchedulingWithSufficientResourcesAfterStabilizationTimeout() { + Duration initialResourceTimeout = Duration.ofMillis(-1); + Duration stabilizationTimeout = Duration.ofMillis(50_000L); + + WaitingForResources wfr = + new WaitingForResources( + ctx, + LOG, + initialResourceTimeout, + stabilizationTimeout, + ctx.getClock(), + null); + // sufficient resources available + ctx.setHasDesiredResources(() -> false); + ctx.setHasSufficientResources(() -> true); + + // notify about sufficient resources + wfr.onNewResourcesAvailable(); + + ctx.setExpectCreatingExecutionGraph(); + + // execute all runnables and trigger expected state transition + final Duration afterStabilizationTimeout = stabilizationTimeout.plusMillis(1); + ctx.advanceTimeByMillis(afterStabilizationTimeout.toMillis()); + + ctx.runScheduledTasks(afterStabilizationTimeout.toMillis()); + + assertThat(ctx.hasStateTransition()).isTrue(); } @Test - public void testStabilizationTimeoutReset() throws Exception { - try (MockContext ctx = new MockContext()) { - - Duration initialResourceTimeout = Duration.ofMillis(-1); - Duration stabilizationTimeout = Duration.ofMillis(50L); - - WaitingForResources wfr = - new WaitingForResources( - ctx, - log, - initialResourceTimeout, - stabilizationTimeout, - ctx.getClock(), - null); - - ctx.setHasDesiredResources(() -> false); - - // notify about resources, trigger stabilization timeout - ctx.setHasSufficientResources(() -> true); - ctx.advanceTimeByMillis(40); // advance time, but don't trigger stabilizationTimeout - wfr.onNewResourcesAvailable(); - - // notify again, but insufficient (reset stabilization timeout) - ctx.setHasSufficientResources(() -> false); - ctx.advanceTimeByMillis(40); - wfr.onNewResourcesAvailable(); - - // notify again, but sufficient, trigger timeout - ctx.setHasSufficientResources(() -> true); - ctx.advanceTimeByMillis(40); - wfr.onNewResourcesAvailable(); - - // sanity check: no state transition has been triggered so far - assertThat(ctx.hasStateTransition(), is(false)); - assertThat(ctx.getTestDuration(), greaterThan(stabilizationTimeout)); - - ctx.setExpectCreatingExecutionGraph(); - - ctx.advanceTimeByMillis(1); - assertThat(ctx.hasStateTransition(), is(false)); - - ctx.advanceTimeByMillis(stabilizationTimeout.toMillis()); - assertThat(ctx.hasStateTransition(), is(true)); - } + void testStabilizationTimeoutReset() { + Duration initialResourceTimeout = Duration.ofMillis(-1); + Duration stabilizationTimeout = Duration.ofMillis(50L); + + WaitingForResources wfr = + new WaitingForResources( + ctx, + LOG, + initialResourceTimeout, + stabilizationTimeout, + ctx.getClock(), + null); + + ctx.setHasDesiredResources(() -> false); + + // notify about resources, trigger stabilization timeout + ctx.setHasSufficientResources(() -> true); + ctx.advanceTimeByMillis(40); // advance time, but don't trigger stabilizationTimeout + wfr.onNewResourcesAvailable(); + + // notify again, but insufficient (reset stabilization timeout) + ctx.setHasSufficientResources(() -> false); + ctx.advanceTimeByMillis(40); + wfr.onNewResourcesAvailable(); + + // notify again, but sufficient, trigger timeout + ctx.setHasSufficientResources(() -> true); + ctx.advanceTimeByMillis(40); + wfr.onNewResourcesAvailable(); + + // sanity check: no state transition has been triggered so far + assertThat(ctx.hasStateTransition()).isFalse(); + assertThat(ctx.getTestDuration()).isGreaterThan(stabilizationTimeout); + + ctx.setExpectCreatingExecutionGraph(); + + ctx.advanceTimeByMillis(1); + assertThat(ctx.hasStateTransition()).isFalse(); + + ctx.advanceTimeByMillis(stabilizationTimeout.toMillis()); + assertThat(ctx.hasStateTransition()).isTrue(); } @Test - public void testNoStateTransitionOnNoResourceTimeout() throws Exception { - try (MockContext ctx = new MockContext()) { - ctx.setHasDesiredResources(() -> false); - WaitingForResources wfr = - new WaitingForResources(ctx, log, Duration.ofMillis(-1), STABILIZATION_TIMEOUT); - - ctx.runScheduledTasks(); - assertThat(ctx.hasStateTransition(), is(false)); - } + void testNoStateTransitionOnNoResourceTimeout() { + ctx.setHasDesiredResources(() -> false); + WaitingForResources wfr = + new WaitingForResources(ctx, LOG, Duration.ofMillis(-1), STABILIZATION_TIMEOUT); + + ctx.runScheduledTasks(); + assertThat(ctx.hasStateTransition()).isFalse(); } @Test - public void testStateTransitionOnResourceTimeout() throws Exception { - try (MockContext ctx = new MockContext()) { - ctx.setHasDesiredResources(() -> false); - WaitingForResources wfr = - new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); + void testStateTransitionOnResourceTimeout() { + ctx.setHasDesiredResources(() -> false); + WaitingForResources wfr = + new WaitingForResources(ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT); - ctx.setExpectCreatingExecutionGraph(); + ctx.setExpectCreatingExecutionGraph(); - ctx.runScheduledTasks(); - } + ctx.runScheduledTasks(); } @Test - public void testTransitionToFinishedOnGlobalFailure() throws Exception { - final String testExceptionString = "This is a test exception"; - try (MockContext ctx = new MockContext()) { - ctx.setHasDesiredResources(() -> false); - WaitingForResources wfr = - new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); - - ctx.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); - assertThat(archivedExecutionGraph.getFailureInfo(), notNullValue()); - assertTrue( - archivedExecutionGraph - .getFailureInfo() - .getExceptionAsString() - .contains(testExceptionString)); - }); - - wfr.handleGlobalFailure( - new RuntimeException(testExceptionString), - FailureEnricherUtils.EMPTY_FAILURE_LABELS); - } + void testTransitionToFinishedOnGlobalFailure() { + ctx.setHasDesiredResources(() -> false); + WaitingForResources wfr = + new WaitingForResources(ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT); + RuntimeException expectedException = new RuntimeException("This is a test exception"); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); + assertThat( + archivedExecutionGraph + .getFailureInfo() + .getException() + .deserializeError(this.getClass().getClassLoader())) + .isEqualTo(expectedException); + }); + + wfr.handleGlobalFailure(expectedException, FailureEnricherUtils.EMPTY_FAILURE_LABELS); } @Test - public void testCancel() throws Exception { - try (MockContext ctx = new MockContext()) { - ctx.setHasDesiredResources(() -> false); - WaitingForResources wfr = - new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); - - ctx.setExpectFinished( - (archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState(), is(JobStatus.CANCELED)); - })); - wfr.cancel(); - } + void testCancel() { + ctx.setHasDesiredResources(() -> false); + WaitingForResources wfr = + new WaitingForResources(ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNull(); + }); + wfr.cancel(); } @Test - public void testSuspend() throws Exception { - try (MockContext ctx = new MockContext()) { - ctx.setHasDesiredResources(() -> false); - WaitingForResources wfr = - new WaitingForResources(ctx, log, Duration.ZERO, STABILIZATION_TIMEOUT); - - ctx.setExpectFinished( - (archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); - assertThat(archivedExecutionGraph.getFailureInfo(), notNullValue()); - })); - - wfr.suspend(new RuntimeException("suspend")); - } + void testSuspend() { + ctx.setHasDesiredResources(() -> false); + WaitingForResources wfr = + new WaitingForResources(ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT); + + FlinkException expectedException = new FlinkException("This is a test exception"); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); + assertThat( + archivedExecutionGraph + .getFailureInfo() + .getException() + .deserializeError(this.getClass().getClassLoader())) + .isEqualTo(expectedException); + }); + + wfr.suspend(expectedException); } @Test - public void testInternalRunScheduledTasks_correctExecutionOrder() { - MockContext ctx = new MockContext(); + void testInternalRunScheduledTasks_correctExecutionOrder() { AtomicBoolean firstRun = new AtomicBoolean(false); AtomicBoolean secondRun = new AtomicBoolean(false); AtomicBoolean thirdRun = new AtomicBoolean(false); @@ -302,16 +283,12 @@ public class WaitingForResourcesTest extends TestLogger { Runnable runFirstBecauseOfLowDelay = () -> firstRun.set(true); Runnable runSecondBecauseOfScheduleOrder = () -> { - if (!firstRun.get()) { - fail("order violated"); - } + assertThat(firstRun).as("order violated").isTrue(); secondRun.set(true); }; Runnable runLastBecauseOfHighDelay = () -> { - if (!secondRun.get()) { - fail("order violated"); - } + assertThat(secondRun).as("order violated").isTrue(); thirdRun.set(true); }; @@ -328,19 +305,15 @@ public class WaitingForResourcesTest extends TestLogger { ctx.runScheduledTasks(); - assertThat(thirdRun.get(), is(true)); + assertThat(thirdRun).isTrue(); } @Test - public void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() { - MockContext ctx = new MockContext(); - + void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() { AtomicBoolean executed = new AtomicBoolean(false); Runnable executeOnce = () -> { - if (executed.get()) { - fail("Multiple executions"); - } + assertThat(executed).as("Multiple executions").isFalse(); executed.set(true); }; @@ -349,12 +322,11 @@ public class WaitingForResourcesTest extends TestLogger { // execute at least twice ctx.runScheduledTasks(); ctx.runScheduledTasks(); + assertThat(executed).isTrue(); } @Test - public void testInternalRunScheduledTasks_upperBoundRespected() { - MockContext ctx = new MockContext(); - + void testInternalRunScheduledTasks_upperBoundRespected() { Runnable executeNever = () -> fail("Not expected"); ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeNever, Duration.ofMillis(10)); @@ -363,8 +335,7 @@ public class WaitingForResourcesTest extends TestLogger { } @Test - public void testInternalRunScheduledTasks_scheduleTaskFromRunnable() { - MockContext ctx = new MockContext(); + void testInternalRunScheduledTasks_scheduleTaskFromRunnable() { final State state = new AdaptiveSchedulerTest.DummyState(); AtomicBoolean executed = new AtomicBoolean(false); @@ -378,10 +349,11 @@ public class WaitingForResourcesTest extends TestLogger { // choose time that includes inner execution as well ctx.runScheduledTasks(10); - assertThat(executed.get(), is(true)); + assertThat(executed).isTrue(); } - private static class MockContext implements WaitingForResources.Context, AutoCloseable { + private static class MockContext implements WaitingForResources.Context, AfterEachCallback { + private static final Logger LOG = LoggerFactory.getLogger(MockContext.class); private final StateValidator<Void> creatingExecutionGraphStateValidator = @@ -441,7 +413,7 @@ public class WaitingForResourcesTest extends TestLogger { } @Override - public void close() throws Exception { + public void afterEach(ExtensionContext extensionContext) throws Exception { creatingExecutionGraphStateValidator.close(); finishedStateValidator.close(); } @@ -474,7 +446,7 @@ public class WaitingForResourcesTest extends TestLogger { final ScheduledTask<Void> scheduledTask = new ScheduledTask<>( () -> { - if (!hasStateTransition) { + if (!hasStateTransition()) { action.run(); } @@ -490,19 +462,23 @@ public class WaitingForResourcesTest extends TestLogger { @Override public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) { finishedStateValidator.validateInput(archivedExecutionGraph); - hasStateTransition = true; + registerStateTransition(); } @Override public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) { creatingExecutionGraphStateValidator.validateInput(null); - hasStateTransition = true; + registerStateTransition(); } public boolean hasStateTransition() { return hasStateTransition; } + public void registerStateTransition() { + hasStateTransition = true; + } + public Clock getClock() { return testTime.getClock(); } @@ -550,6 +526,6 @@ public class WaitingForResourcesTest extends TestLogger { } static <T> Consumer<T> assertNonNull() { - return (item) -> assertThat(item, notNullValue()); + return (item) -> assertThat(item).isNotNull(); } }
