This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bc9a99fa061 [FLINK-36948][tests] Remove deprecated FlinkMatchers
bc9a99fa061 is described below
commit bc9a99fa061e4cd2f003742442c7151bfa141c30
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Dec 26 20:33:55 2024 +0100
[FLINK-36948][tests] Remove deprecated FlinkMatchers
---
.../DispatcherCachedOperationsHandlerTest.java | 66 ++--
.../dispatcher/DispatcherCleanupITCase.java | 86 ++---
.../dispatcher/DispatcherResourceCleanupTest.java | 25 +-
.../flink/runtime/dispatcher/DispatcherTest.java | 223 +++++------
.../DispatcherResourceManagerComponentTest.java | 9 +-
.../runtime/heartbeat/HeartbeatManagerTest.java | 77 ++--
.../jobmanager/DefaultExecutionPlanStoreTest.java | 97 +++--
.../catalog/CatalogBaseTableResolutionTest.java | 85 ++---
.../flink/table/catalog/SchemaResolutionTest.java | 12 +-
.../extraction/DataTypeExtractorScalaTest.scala | 3 +-
.../table/codesplit/JavaCodeSplitterTest.java | 17 +-
.../types/extraction/DataTypeExtractorTest.java | 8 +-
.../flink/table/planner/codegen/CodeSplitTest.java | 4 +-
.../window/tvf/slicing/SliceAssignerTestBase.java | 11 +-
.../testutils/FlinkCompletableFutureAssert.java | 19 +-
.../apache/flink/core/testutils/FlinkMatchers.java | 406 ---------------------
.../flink/runtime/jobmaster/JobMasterITCase.java | 16 +-
.../test/scheduling/AdaptiveSchedulerITCase.java | 92 ++---
18 files changed, 359 insertions(+), 897 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
index 7768e36724c..889f44d846d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
import org.apache.flink.runtime.rest.handler.async.OperationResult;
@@ -41,11 +41,8 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link DispatcherCachedOperationsHandler} component. */
public class DispatcherCachedOperationsHandlerTest extends TestLogger {
@@ -124,18 +121,17 @@ public class DispatcherCachedOperationsHandlerTest
extends TestLogger {
TriggerSavepointMode.SAVEPOINT,
TIMEOUT);
- assertThat(triggerSavepointFunction.getNumberOfInvocations(), is(1));
- assertThat(
- triggerSavepointFunction.getInvocationParameters().get(0),
- is(
+ assertThat(triggerSavepointFunction.getNumberOfInvocations()).isOne();
+ assertThat(triggerSavepointFunction.getInvocationParameters().get(0))
+ .isEqualTo(
new Tuple4<>(
jobID,
targetDirectory,
SavepointFormatType.CANONICAL,
- TriggerSavepointMode.SAVEPOINT)));
+ TriggerSavepointMode.SAVEPOINT));
- assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
- assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+ assertThat(firstAcknowledge.get()).isEqualTo(Acknowledge.get());
+ assertThat(secondAcknowledge.get()).isEqualTo(Acknowledge.get());
}
@Test
@@ -155,18 +151,17 @@ public class DispatcherCachedOperationsHandlerTest
extends TestLogger {
TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT,
TIMEOUT);
- assertThat(stopWithSavepointFunction.getNumberOfInvocations(), is(1));
- assertThat(
- stopWithSavepointFunction.getInvocationParameters().get(0),
- is(
+ assertThat(stopWithSavepointFunction.getNumberOfInvocations()).isOne();
+ assertThat(stopWithSavepointFunction.getInvocationParameters().get(0))
+ .isEqualTo(
new Tuple4<>(
jobID,
targetDirectory,
SavepointFormatType.CANONICAL,
-
TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT)));
+
TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT));
- assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
- assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+ assertThat(firstAcknowledge.get()).isEqualTo(Acknowledge.get());
+ assertThat(secondAcknowledge.get()).isEqualTo(Acknowledge.get());
}
@Test
@@ -190,23 +185,22 @@ public class DispatcherCachedOperationsHandlerTest
extends TestLogger {
.get();
// should not complete because we wait for the result to be accessed
- assertThat(
- savepointTriggerCache.closeAsync(),
- FlinkMatchers.willNotComplete(Duration.ofMillis(10)));
+ FlinkAssertions.assertThatFuture(savepointTriggerCache.closeAsync())
+ .willNotCompleteWithin(Duration.ofMillis(10));
}
@Test
public void throwsIfCacheIsShuttingDown() {
savepointTriggerCache.closeAsync();
- assertThrows(
- IllegalStateException.class,
- () ->
- handler.triggerSavepoint(
- operationKey,
- targetDirectory,
- SavepointFormatType.CANONICAL,
- TriggerSavepointMode.SAVEPOINT,
- TIMEOUT));
+ assertThatThrownBy(
+ () ->
+ handler.triggerSavepoint(
+ operationKey,
+ targetDirectory,
+ SavepointFormatType.CANONICAL,
+ TriggerSavepointMode.SAVEPOINT,
+ TIMEOUT))
+ .isInstanceOf(IllegalStateException.class);
}
@Test
@@ -224,15 +218,17 @@ public class DispatcherCachedOperationsHandlerTest
extends TestLogger {
CompletableFuture<OperationResult<String>> statusFuture =
handler.getSavepointStatus(operationKey);
- assertEquals(statusFuture.get(),
OperationResult.success(savepointLocation));
+
assertThat(statusFuture.get()).isEqualTo(OperationResult.success(savepointLocation));
}
@Test
- public void getStatusFailsIfKeyUnknown() throws InterruptedException {
+ public void getStatusFailsIfKeyUnknown() {
CompletableFuture<OperationResult<String>> statusFuture =
handler.getSavepointStatus(operationKey);
- assertThat(statusFuture,
futureFailedWith(UnknownOperationKeyException.class));
+ FlinkAssertions.assertThatFuture(statusFuture)
+ .eventuallyFails()
+ .withCauseOfType(UnknownOperationKeyException.class);
}
private abstract static class TriggerCheckpointSpyFunction
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index 92725c4b09d..d07a38156d8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CleanupOptions;
import org.apache.flink.core.execution.RecoveryClaimMode;
-import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
@@ -53,10 +52,7 @@ import
org.apache.flink.runtime.testutils.TestingExecutionPlanStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.util.concurrent.FutureUtils;
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.collection.IsEmptyCollection;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -65,16 +61,14 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** An integration test for various fail-over scenarios of the {@link
Dispatcher} component. */
public class DispatcherCleanupITCase extends AbstractDispatcherTest {
@@ -95,8 +89,8 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
// First job cleanup still succeeded for the
// CompletedCheckpointStore because the
JobGraph cleanup happens
// after the JobManagerRunner closing
-
assertTrue(previous.getShutdownStatus().isPresent());
-
assertTrue(previous.getAllCheckpoints().isEmpty());
+
assertThat(previous.getShutdownStatus()).isPresent();
+
assertThat(previous.getAllCheckpoints()).isEmpty();
return new EmbeddedCompletedCheckpointStore(
maxCheckpoints,
previous.getAllCheckpoints(),
@@ -191,12 +185,11 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
successfulCleanupLatch.await();
- assertThat(actualGlobalCleanupCallCount.get(), equalTo(numberOfErrors
+ 1));
+
assertThat(actualGlobalCleanupCallCount.get()).isEqualTo(numberOfErrors + 1);
- assertThat(
- "The JobGraph should be removed from ExecutionPlanStore.",
- haServices.getExecutionPlanStore().getJobIds(),
- IsEmptyCollection.empty());
+ assertThat(haServices.getExecutionPlanStore().getJobIds())
+ .as("The JobGraph should be removed from ExecutionPlanStore.")
+ .isEmpty();
CommonTestUtils.waitUntilCondition(
() ->
haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get());
@@ -233,20 +226,15 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
CommonTestUtils.waitUntilCondition(() -> jobManagerRunnerEntry.get()
!= null);
- assertThat(
- "The JobResultStore should have this job still marked as
dirty.",
-
haServices.getJobResultStore().hasDirtyJobResultEntryAsync(jobId).get(),
- CoreMatchers.is(true));
+
assertThat(haServices.getJobResultStore().hasDirtyJobResultEntryAsync(jobId).get())
+ .as("The JobResultStore should have this job still marked as
dirty.")
+ .isTrue();
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
- try {
- dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
- Assert.fail("Should fail because cancelling the cleanup is not
allowed.");
- } catch (ExecutionException e) {
- assertThat(e,
FlinkMatchers.containsCause(JobCancellationFailedException.class));
- }
+ assertThatThrownBy(() -> dispatcherGateway.cancelJob(jobId,
TIMEOUT).get())
+ .hasCauseInstanceOf(JobCancellationFailedException.class);
jobManagerRunnerCleanupFuture.complete(null);
CommonTestUtils.waitUntilCondition(
@@ -305,25 +293,22 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway,
jobId);
firstCleanupTriggered.await();
- assertThat(
- "The cleanup should have been triggered only once.",
- actualGlobalCleanupCallCount.get(),
- equalTo(1));
- assertThat(
- "The cleanup should not have reached the successful cleanup
code path.",
- successfulJobGraphCleanup.isDone(),
- equalTo(false));
+ assertThat(actualGlobalCleanupCallCount.get())
+ .as("The cleanup should have been triggered only once.")
+ .isOne();
+ assertThat(successfulJobGraphCleanup.isDone())
+ .as("The cleanup should not have reached the successful
cleanup code path.")
+ .isFalse();
+ assertThat(haServices.getExecutionPlanStore().getJobIds())
+ .as("The JobGraph is still stored in the ExecutionPlanStore.")
+ .containsExactly(jobId);
assertThat(
- "The JobGraph is still stored in the ExecutionPlanStore.",
- haServices.getExecutionPlanStore().getJobIds(),
- equalTo(Collections.singleton(jobId)));
- assertThat(
- "The JobResultStore should have this job marked as dirty.",
- haServices.getJobResultStore().getDirtyResults().stream()
- .map(JobResult::getJobId)
- .collect(Collectors.toSet()),
- equalTo(Collections.singleton(jobId)));
+
haServices.getJobResultStore().getDirtyResults().stream()
+ .map(JobResult::getJobId)
+ .collect(Collectors.toSet()))
+ .as("The JobResultStore should have this job marked as dirty.")
+ .containsExactly(jobId);
// Run a second dispatcher, that restores our finished job.
final Dispatcher secondDispatcher =
@@ -338,17 +323,16 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
CommonTestUtils.waitUntilCondition(
() ->
haServices.getJobResultStore().getDirtyResults().isEmpty());
- assertThat(
- "The JobGraph is not stored in the ExecutionPlanStore.",
- haServices.getExecutionPlanStore().getJobIds(),
- IsEmptyCollection.empty());
- assertTrue(
- "The JobResultStore has the job listed as clean.",
-
haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get());
+ assertThat(haServices.getExecutionPlanStore().getJobIds())
+ .as("The JobGraph is not stored in the ExecutionPlanStore.")
+ .isEmpty();
+
assertThat(haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get())
+ .as("The JobResultStore has the job listed as clean.")
+ .isTrue();
- assertThat(successfulJobGraphCleanup.get(), equalTo(jobId));
+ assertThat(successfulJobGraphCleanup.get()).isEqualTo(jobId);
- assertThat(actualGlobalCleanupCallCount.get(), equalTo(2));
+ assertThat(actualGlobalCleanupCallCount.get()).isEqualTo(2);
}
private void waitForJobToFinish(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index cae270a84ed..09c3be44bae 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -84,9 +84,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static
org.apache.flink.runtime.dispatcher.AbstractDispatcherTest.awaitStatus;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@@ -272,12 +271,7 @@ public class DispatcherResourceCleanupTest extends
TestLogger {
startDispatcher(new FailingJobManagerRunnerFactory(new
FlinkException("Test exception")));
final CompletableFuture<Acknowledge> submissionFuture = submitJob();
- try {
- submissionFuture.get();
- fail("Job submission was expected to fail.");
- } catch (ExecutionException ee) {
- assertThat(ee, containsCause(JobSubmissionException.class));
- }
+
assertThatThrownBy(submissionFuture::get).hasCauseInstanceOf(JobSubmissionException.class);
assertGlobalCleanupTriggered(jobId);
}
@@ -647,15 +641,12 @@ public class DispatcherResourceCleanupTest extends
TestLogger {
// submit and fail during job master runner construction
queue.offer(Optional.of(testException));
- try {
- dispatcherGateway.submitJob(jobGraph, Duration.ofMinutes(1)).get();
- fail("A FlinkException is expected");
- } catch (Throwable expectedException) {
- assertThat(expectedException, containsCause(FlinkException.class));
- assertThat(expectedException,
containsMessage(testException.getMessage()));
- // make sure we've cleaned up in correct order (including HA)
- assertGlobalCleanupTriggered(jobId);
- }
+ assertThatThrownBy(() -> dispatcherGateway.submitJob(jobGraph,
Duration.ofMinutes(1)).get())
+ .hasCauseInstanceOf(FlinkException.class)
+ .hasRootCauseMessage(testException.getMessage());
+
+ // make sure we've cleaned up in correct order (including HA)
+ assertGlobalCleanupTriggered(jobId);
// don't fail this time
queue.offer(Optional.empty());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index dbb8d652b28..a1d44b3712e 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.testutils.FlinkAssertions;
-import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobKey;
@@ -107,7 +106,6 @@ import
org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.assertj.core.api.Assertions;
-import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -144,16 +142,8 @@ import java.util.function.BiConsumer;
import java.util.function.Function;
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for the {@link Dispatcher} component. */
public class DispatcherTest extends AbstractDispatcherTest {
@@ -219,9 +209,9 @@ public class DispatcherTest extends AbstractDispatcherTest {
jobMasterLeaderElection.getStartFuture().get();
- assertTrue(
- "jobManagerRunner was not started",
- jobMasterLeaderElection.getStartFuture().isDone());
+ assertThat(jobMasterLeaderElection.getStartFuture())
+ .as("jobManagerRunner was not started")
+ .isDone();
}
@Test
@@ -290,11 +280,11 @@ public class DispatcherTest extends
AbstractDispatcherTest {
.eventuallySucceeds();
// verify that all but one submission failed as duplicates
- Assertions.assertThat(exceptions)
+ assertThat(exceptions)
.hasSize(numThreads - 1)
.allSatisfy(
t ->
- Assertions.assertThat(t)
+ assertThat(t)
.hasCauseInstanceOf(DuplicateJobSubmissionException.class));
}
@@ -308,12 +298,14 @@ public class DispatcherTest extends
AbstractDispatcherTest {
dispatcher.getSelfGateway(DispatcherGateway.class);
final CompletableFuture<Acknowledge> submitFuture =
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
- final ExecutionException executionException =
- assertThrows(ExecutionException.class, submitFuture::get);
- assertTrue(executionException.getCause() instanceof
DuplicateJobSubmissionException);
- final DuplicateJobSubmissionException duplicateException =
- (DuplicateJobSubmissionException)
executionException.getCause();
- assertTrue(duplicateException.isGloballyTerminated());
+ assertThatThrownBy(submitFuture::get)
+ .hasCauseInstanceOf(DuplicateJobSubmissionException.class)
+ .satisfies(
+ e ->
+ assertThat(
+
((DuplicateJobSubmissionException) e.getCause())
+
.isGloballyTerminated())
+ .isTrue());
}
@Test
@@ -328,12 +320,14 @@ public class DispatcherTest extends
AbstractDispatcherTest {
dispatcher.getSelfGateway(DispatcherGateway.class);
final CompletableFuture<Acknowledge> submitFuture =
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
- final ExecutionException executionException =
- assertThrows(ExecutionException.class, submitFuture::get);
- assertTrue(executionException.getCause() instanceof
DuplicateJobSubmissionException);
- final DuplicateJobSubmissionException duplicateException =
- (DuplicateJobSubmissionException)
executionException.getCause();
- assertFalse(duplicateException.isGloballyTerminated());
+ assertThatThrownBy(submitFuture::get)
+ .hasCauseInstanceOf(DuplicateJobSubmissionException.class)
+ .satisfies(
+ e ->
+ assertThat(
+
((DuplicateJobSubmissionException) e.getCause())
+
.isGloballyTerminated())
+ .isFalse());
}
/**
@@ -364,12 +358,8 @@ public class DispatcherTest extends AbstractDispatcherTest
{
CompletableFuture<Acknowledge> acknowledgeFuture =
dispatcherGateway.submitJob(jobGraphWithTwoVertices, TIMEOUT);
- try {
- acknowledgeFuture.get();
- fail("job submission should have failed");
- } catch (ExecutionException e) {
- assertTrue(ExceptionUtils.findThrowable(e,
JobSubmissionException.class).isPresent());
- }
+ assertThatThrownBy(() -> acknowledgeFuture.get())
+ .hasCauseInstanceOf(JobSubmissionException.class);
}
@Test
@@ -384,15 +374,14 @@ public class DispatcherTest extends
AbstractDispatcherTest {
blockingJobMaster.waitForBlockingInit();
// ensure INITIALIZING status
- assertThat(
- dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
- is(JobStatus.INITIALIZING));
+ assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())
+ .isSameAs(JobStatus.INITIALIZING);
// ensure correct JobDetails
MultipleJobsDetails multiDetails =
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
- assertEquals(1, multiDetails.getJobs().size());
- assertEquals(jobId,
multiDetails.getJobs().iterator().next().getJobId());
+ assertThat(multiDetails.getJobs()).hasSize(1);
+
assertThat(multiDetails.getJobs().iterator().next().getJobId()).isEqualTo(jobId);
// let the initialization finish.
blockingJobMaster.unblockJobMasterInitialization();
@@ -410,24 +399,21 @@ public class DispatcherTest extends
AbstractDispatcherTest {
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- assertThat(
- dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
- is(JobStatus.INITIALIZING));
+ assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())
+ .isSameAs(JobStatus.INITIALIZING);
// this call is supposed to fail
- try {
- dispatcherGateway
- .triggerSavepointAndGetLocation(
- jobId,
- "file:///tmp/savepoint",
- SavepointFormatType.CANONICAL,
- TriggerSavepointMode.SAVEPOINT,
- TIMEOUT)
- .get();
- fail("Previous statement should have failed");
- } catch (ExecutionException t) {
- assertTrue(t.getCause() instanceof
UnavailableDispatcherOperationException);
- }
+ assertThatThrownBy(
+ () ->
+ dispatcherGateway
+ .triggerSavepointAndGetLocation(
+ jobId,
+ "file:///tmp/savepoint",
+ SavepointFormatType.CANONICAL,
+ TriggerSavepointMode.SAVEPOINT,
+ TIMEOUT)
+ .get())
+
.hasCauseInstanceOf(UnavailableDispatcherOperationException.class);
}
@Test
@@ -490,8 +476,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
// wait for job to finish
dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
// sanity check
- assertThat(
- dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
is(JobStatus.CANCELED));
+ assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())
+ .isSameAs(JobStatus.CANCELED);
dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
}
@@ -524,16 +510,15 @@ public class DispatcherTest extends
AbstractDispatcherTest {
// wait for job to finish
dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
// sanity check
- assertThat(
- dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
is(JobStatus.FINISHED));
+ assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())
+ .isSameAs(JobStatus.FINISHED);
final CompletableFuture<Acknowledge> cancelFuture =
dispatcherGateway.cancelJob(jobId, TIMEOUT);
- assertThat(
- cancelFuture,
- FlinkMatchers.futureWillCompleteExceptionally(
- FlinkJobTerminatedWithoutCancellationException.class,
Duration.ofHours(8)));
+ FlinkAssertions.assertThatFuture(cancelFuture)
+ .eventuallyFails()
+
.withCauseOfType(FlinkJobTerminatedWithoutCancellationException.class);
}
@Test
@@ -570,10 +555,10 @@ public class DispatcherTest extends
AbstractDispatcherTest {
// wait for job to finish
dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
// sanity check
- assertThat(
- dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
is(JobStatus.SUSPENDED));
+ assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())
+ .isSameAs(JobStatus.SUSPENDED);
- assertThat(archiveAttemptFuture.isDone(), is(false));
+ assertThat(archiveAttemptFuture).isNotDone();
}
@Test
@@ -615,7 +600,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
// get failure cause
ArchivedExecutionGraph execGraph =
dispatcherGateway.requestJob(jobGraph.getJobID(),
TIMEOUT).get();
- assertThat(execGraph.getState(), is(JobStatus.FAILED));
+ assertThat(execGraph.getState()).isSameAs(JobStatus.FAILED);
Assert.assertNotNull(execGraph.getFailureInfo());
Throwable throwable =
@@ -625,7 +610,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
.deserializeError(ClassLoader.getSystemClassLoader());
// ensure correct exception type
- assertThat(throwable.getMessage(), equalTo(testFailure.getMessage()));
+ assertThat(throwable).hasMessage(testFailure.getMessage());
}
/** Test that {@link JobResult} is cached when the job finishes. */
@@ -654,13 +639,12 @@ public class DispatcherTest extends
AbstractDispatcherTest {
dispatcher.completeJobExecution(failedExecutionGraphInfo);
- assertThat(
- dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(),
- equalTo(expectedState));
+ assertThat(dispatcherGateway.requestJobStatus(failedJobId,
TIMEOUT).get())
+ .isEqualTo(expectedState);
final CompletableFuture<ExecutionGraphInfo>
completableFutureCompletableFuture =
dispatcher.callAsyncInMainThread(
() ->
dispatcher.requestExecutionGraphInfo(failedJobId, TIMEOUT));
- assertThat(completableFutureCompletableFuture.get(),
is(failedExecutionGraphInfo));
+
assertThat(completableFutureCompletableFuture.get()).isEqualTo(failedExecutionGraphInfo);
}
@Test
@@ -685,8 +669,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
CompletableFuture<CheckpointStatsSnapshot> resultsFuture =
dispatcher.callAsyncInMainThread(
() -> dispatcher.requestCheckpointStats(jobId,
TIMEOUT));
-
Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1));
- Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot);
+ assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1));
+ assertThat(resultsFuture).isCompletedWithValue(snapshot);
}
@Test
@@ -734,8 +718,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
CompletableFuture<CheckpointStatsSnapshot> resultsFuture =
dispatcher.callAsyncInMainThread(
() -> dispatcher.requestCheckpointStats(jobId,
TIMEOUT));
-
Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1));
- Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot);
+ assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1));
+ assertThat(resultsFuture).isCompletedWithValue(snapshot);
}
private CheckpointStatsSnapshot
getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints() {
@@ -763,8 +747,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
dispatcher.callAsyncInMainThread(
() -> dispatcher.requestCheckpointStats(jobId,
TIMEOUT));
-
Assertions.assertThat(resultsFuture).failsWithin(Duration.ofSeconds(1));
- Assertions.assertThat(resultsFuture).isCompletedExceptionally();
+ assertThat(resultsFuture).failsWithin(Duration.ofSeconds(1));
+ assertThat(resultsFuture).isCompletedExceptionally();
Assertions.assertThatThrownBy(resultsFuture::get)
.hasCauseInstanceOf(FlinkJobNotFoundException.class)
@@ -781,12 +765,8 @@ public class DispatcherTest extends AbstractDispatcherTest
{
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
- try {
- dispatcherGateway.requestJob(new JobID(), TIMEOUT).get();
- } catch (ExecutionException e) {
- final Throwable throwable =
ExceptionUtils.stripExecutionException(e);
- assertThat(throwable, instanceOf(FlinkJobNotFoundException.class));
- }
+ assertThatThrownBy(() -> dispatcherGateway.requestJob(new JobID(),
TIMEOUT).get())
+ .hasCauseInstanceOf(FlinkJobNotFoundException.class);
}
/** Tests that we can dispose a savepoint. */
@@ -804,11 +784,11 @@ public class DispatcherTest extends
AbstractDispatcherTest {
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
- assertThat(Files.exists(savepointPath), is(true));
+ assertThat(Files.exists(savepointPath)).isTrue();
dispatcherGateway.disposeSavepoint(externalPointer.toString(),
TIMEOUT).get();
- assertThat(Files.exists(savepointPath), is(false));
+ assertThat(Files.exists(savepointPath)).isFalse();
}
@Nonnull
@@ -892,10 +872,8 @@ public class DispatcherTest extends AbstractDispatcherTest
{
final Throwable error =
fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMillis(),
TimeUnit.MILLISECONDS);
- assertThat(
- ExceptionUtils.findThrowableWithMessage(error,
testException.getMessage())
- .isPresent(),
- is(true));
+ assertThat(ExceptionUtils.findThrowableWithMessage(error,
testException.getMessage()))
+ .isPresent();
fatalErrorHandler.clearError();
}
@@ -946,15 +924,13 @@ public class DispatcherTest extends
AbstractDispatcherTest {
final TestingJobManagerRunner cleanupRunner =
cleanupRunnerFactory.takeCreatedJobManagerRunner();
- assertThat(
- "The CleanupJobManagerRunner has the wrong job ID attached.",
- cleanupRunner.getJobID(),
- is(jobIdOfRecoveredDirtyJobs));
+ assertThat(cleanupRunner.getJobID())
+ .as("The CleanupJobManagerRunner has the wrong job ID
attached.")
+ .isEqualTo(jobIdOfRecoveredDirtyJobs);
- assertThat(
- "No JobMaster should have been started.",
- jobManagerRunnerFactory.getQueueSize(),
- is(0));
+ assertThat(jobManagerRunnerFactory.getQueueSize())
+ .as("No JobMaster should have been started.")
+ .isZero();
}
@Test
@@ -975,11 +951,11 @@ public class DispatcherTest extends
AbstractDispatcherTest {
dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- assertThat(dispatcher.getNumberJobs(TIMEOUT).get(), Matchers.is(1));
+ assertThat(dispatcher.getNumberJobs(TIMEOUT).get()).isOne();
dispatcher.close();
- assertThat(submittedExecutionPlanStore.contains(jobGraph.getJobID()),
Matchers.is(true));
+
assertThat(submittedExecutionPlanStore.contains(jobGraph.getJobID())).isTrue();
}
/** Tests that a submitted job is suspended if the Dispatcher is
terminated. */
@@ -998,12 +974,12 @@ public class DispatcherTest extends
AbstractDispatcherTest {
final CompletableFuture<JobResult> jobResultFuture =
dispatcherGateway.requestJobResult(jobGraph.getJobID(),
TIMEOUT);
- assertThat(jobResultFuture.isDone(), is(false));
+ assertThat(jobResultFuture).isNotDone();
dispatcher.close();
final JobResult jobResult = jobResultFuture.get();
- assertEquals(jobResult.getApplicationStatus(),
ApplicationStatus.UNKNOWN);
+
assertThat(jobResult.getApplicationStatus()).isSameAs(ApplicationStatus.UNKNOWN);
}
@Test
@@ -1093,13 +1069,10 @@ public class DispatcherTest extends
AbstractDispatcherTest {
processFuture.join();
- assertThat(releaseJobGraphFuture.get(), is(jobGraph.getJobID()));
+ assertThat(releaseJobGraphFuture.get()).isEqualTo(jobGraph.getJobID());
- try {
- removeJobGraphFuture.get(10L, TimeUnit.MILLISECONDS);
- fail("onRemovedExecutionPlan should not remove the job from the
ExecutionPlanStore.");
- } catch (TimeoutException expected) {
- }
+ assertThatThrownBy(() -> removeJobGraphFuture.get(10L,
TimeUnit.MILLISECONDS))
+ .isInstanceOf(TimeoutException.class);
}
@Test
@@ -1119,7 +1092,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
final long initializationTimestamp =
initializationTimestampQueue.take();
// ensure all statuses are set in the ExecutionGraph
- assertThat(initializationTimestamp, greaterThan(0L));
+ assertThat(initializationTimestamp).isGreaterThan(0L);
}
@Test
@@ -1286,8 +1259,8 @@ public class DispatcherTest extends
AbstractDispatcherTest {
private static void assertOnlyContainsSingleJobWithState(
final JobStatus expectedJobStatus, final MultipleJobsDetails
multipleJobsDetails) {
final Collection<JobDetails> finishedJobDetails =
multipleJobsDetails.getJobs();
- assertEquals(1, finishedJobDetails.size());
- assertEquals(expectedJobStatus,
finishedJobDetails.iterator().next().getStatus());
+ assertThat(finishedJobDetails).hasSize(1);
+
assertThat(finishedJobDetails.iterator().next().getStatus()).isEqualTo(expectedJobStatus);
}
@Test
@@ -1304,7 +1277,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
.setRecoveredJobs(Collections.singleton(new
JobGraph(jobId1, "foobar")))
.build(rpcService);
- Assertions.assertThat(blobServer.getFile(jobId1,
blobKey1)).hasBinaryContent(fileContent);
+ assertThat(blobServer.getFile(jobId1,
blobKey1)).hasBinaryContent(fileContent);
Assertions.assertThatThrownBy(() -> blobServer.getFile(jobId2,
blobKey2))
.isInstanceOf(NoSuchFileException.class);
}
@@ -1326,16 +1299,16 @@ public class DispatcherTest extends
AbstractDispatcherTest {
submitFuture.get();
final ArchivedExecutionGraph archivedExecutionGraph =
dispatcherGateway.requestJob(failedJobId, TIMEOUT).get();
-
Assertions.assertThat(archivedExecutionGraph.getJobID()).isEqualTo(failedJobId);
-
Assertions.assertThat(archivedExecutionGraph.getJobName()).isEqualTo(failedJobName);
-
Assertions.assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
- Assertions.assertThat(archivedExecutionGraph.getFailureInfo())
+ assertThat(archivedExecutionGraph.getJobID()).isEqualTo(failedJobId);
+
assertThat(archivedExecutionGraph.getJobName()).isEqualTo(failedJobName);
+
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
+ assertThat(archivedExecutionGraph.getFailureInfo())
.isNotNull()
.extracting(ErrorInfo::getException)
.extracting(e ->
e.deserializeError(Thread.currentThread().getContextClassLoader()))
.satisfies(
exception ->
- Assertions.assertThat(exception)
+ assertThat(exception)
.isInstanceOf(RuntimeException.class)
.hasMessage("Test exception."));
}
@@ -1430,8 +1403,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
.isInstanceOf(RestHandlerException.class)
.satisfies(
e ->
- Assertions.assertThat(
- ((RestHandlerException)
e).getHttpResponseStatus())
+ assertThat(((RestHandlerException)
e).getHttpResponseStatus())
.isSameAs(HttpResponseStatus.INTERNAL_SERVER_ERROR));
// verify that persist errors prevents the requirement from being
applied
@@ -1509,8 +1481,8 @@ public class DispatcherTest extends
AbstractDispatcherTest {
testConcurrentModificationIsPrevented(
dispatcherGateway, blockingJobMaster, secondJobGraph);
- Assertions.assertThat(firstPendingUpdateFuture).isNotCompleted();
- Assertions.assertThat(secondPendingUpdateFuture).isNotCompleted();
+ assertThat(firstPendingUpdateFuture).isNotCompleted();
+ assertThat(secondPendingUpdateFuture).isNotCompleted();
blockedUpdatesToJobMasterFuture.complete(Acknowledge.get());
assertThatFuture(firstPendingUpdateFuture).eventuallySucceeds();
assertThatFuture(secondPendingUpdateFuture).eventuallySucceeds();
@@ -1541,8 +1513,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
.isInstanceOf(RestHandlerException.class)
.satisfies(
e ->
- Assertions.assertThat(
- ((RestHandlerException)
e).getHttpResponseStatus())
+ assertThat(((RestHandlerException)
e).getHttpResponseStatus())
.isSameAs(HttpResponseStatus.CONFLICT));
assertThatFuture(pendingUpdateFuture).isNotCompleted();
@@ -1634,7 +1605,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
Collection<FailureEnricher> failureEnricher,
long initializationTimestamp)
throws Exception {
- assertEquals(expectedJobId, graph.getJobID());
+ assertThat(graph.getJobID()).isEqualTo(expectedJobId);
final JobMasterGateway jobMasterGateway =
new TestingJobMasterGatewayBuilder()
.setRequestJobSupplier(
@@ -1943,7 +1914,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
- assertEquals(expectedJobId, graph.getJobID());
+ assertThat(graph.getJobID()).isEqualTo(expectedJobId);
return
JobMasterServiceLeadershipRunnerFactory.INSTANCE.createJobManagerRunner(
graph,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java
index 21ed3e30d50..116a25d07c5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.entrypoint.component;
+import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches;
import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
@@ -36,9 +37,7 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link DispatcherResourceManagerComponent}. */
public class DispatcherResourceManagerComponentTest extends TestLogger {
@@ -59,7 +58,7 @@ public class DispatcherResourceManagerComponentTest extends
TestLogger {
terminationFuture.completeExceptionally(expectedException);
final Throwable error = fatalErrorHandler.getException();
- assertThat(error, containsCause(expectedException));
+ assertThat(error).hasCause(expectedException);
}
private DispatcherResourceManagerComponent
createDispatcherResourceManagerComponent(
@@ -94,7 +93,7 @@ public class DispatcherResourceManagerComponentTest extends
TestLogger {
terminationFuture.completeExceptionally(expectedException);
final CompletableFuture<Throwable> errorFuture =
fatalErrorHandler.getErrorFuture();
- assertThat(errorFuture, willNotComplete(Duration.ofMillis(10L)));
+
FlinkAssertions.assertThatFuture(errorFuture).willNotCompleteWithin(Duration.ofMillis(10L));
}
/**
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index 955338f9f3e..a52c1501ccd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.heartbeat;
-import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
@@ -29,7 +29,6 @@ import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-import org.hamcrest.Matcher;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
@@ -51,14 +50,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link HeartbeatManager}. */
public class HeartbeatManagerTest extends TestLogger {
@@ -116,12 +109,12 @@ public class HeartbeatManagerTest extends TestLogger {
final String inputPayload1 = "foobar";
heartbeatManager.requestHeartbeat(targetResourceID, inputPayload1);
- assertThat(reportedPayloads.take(), is(inputPayload1));
- assertThat(reportedPayloadsHeartbeatTarget.take(), is(outputPayload));
+ assertThat(reportedPayloads.take()).isEqualTo(inputPayload1);
+
assertThat(reportedPayloadsHeartbeatTarget.take()).isEqualTo(outputPayload);
final String inputPayload2 = "barfoo";
heartbeatManager.receiveHeartbeat(targetResourceID, inputPayload2);
- assertThat(reportedPayloads.take(), is(inputPayload2));
+ assertThat(reportedPayloads.take()).isEqualTo(inputPayload2);
}
/** Tests that the heartbeat monitors are updated when receiving a new
heartbeat signal. */
@@ -157,9 +150,9 @@ public class HeartbeatManagerTest extends TestLogger {
final List<ScheduledFuture<?>> scheduledTasksAfterHeartbeat =
manuallyTriggeredScheduledExecutor.getAllScheduledTasks();
- assertThat(scheduledTasksAfterHeartbeat, hasSize(2));
+ assertThat(scheduledTasksAfterHeartbeat).hasSize(2);
// the first scheduled future should be cancelled by the heartbeat
update
- assertTrue(scheduledTasksAfterHeartbeat.get(0).isCancelled());
+ assertThat(scheduledTasksAfterHeartbeat.get(0).isCancelled()).isTrue();
}
/** Tests that a heartbeat timeout is signaled if the heartbeat is not
reported in time. */
@@ -197,12 +190,12 @@ public class HeartbeatManagerTest extends TestLogger {
Thread.sleep(HEARTBEAT_INTERVAL);
}
- assertFalse(timeoutFuture.isDone());
+ FlinkAssertions.assertThatFuture(timeoutFuture).eventuallySucceeds();
ResourceID timeoutResourceID =
timeoutFuture.get(2 * HEARTBEAT_TIMEOUT,
TimeUnit.MILLISECONDS);
- assertEquals(targetResourceID, timeoutResourceID);
+ assertThat(targetResourceID).isEqualTo(timeoutResourceID);
}
/**
@@ -264,20 +257,19 @@ public class HeartbeatManagerTest extends TestLogger {
Thread.sleep(2 * HEARTBEAT_TIMEOUT);
- assertFalse(targetHeartbeatTimeoutFuture.isDone());
+ assertThat(targetHeartbeatTimeoutFuture).isNotDone();
heartbeatManagerTarget.stop();
ResourceID timeoutResourceID =
targetHeartbeatTimeoutFuture.get(2 * HEARTBEAT_TIMEOUT,
TimeUnit.MILLISECONDS);
- assertThat(timeoutResourceID, is(resourceIdTarget));
+ assertThat(timeoutResourceID).isEqualTo(resourceIdTarget);
int numberHeartbeats = (int) (2 * HEARTBEAT_TIMEOUT /
HEARTBEAT_INTERVAL);
- final Matcher<Integer> numberHeartbeatsMatcher =
greaterThanOrEqualTo(numberHeartbeats / 2);
- assertThat(numReportPayloadCallsTarget.get(),
is(numberHeartbeatsMatcher));
- assertThat(numReportPayloadCallsSender.get(),
is(numberHeartbeatsMatcher));
+
assertThat(numReportPayloadCallsTarget.get()).isGreaterThanOrEqualTo(numberHeartbeats
/ 2);
+
assertThat(numReportPayloadCallsSender.get()).isGreaterThanOrEqualTo(numberHeartbeats
/ 2);
}
/** Tests that after unmonitoring a target, there won't be a timeout
triggered. */
@@ -311,12 +303,9 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatManager.unmonitorTarget(targetID);
- try {
- timeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
- fail("Timeout should time out.");
- } catch (TimeoutException ignored) {
- // the timeout should not be completed since we unmonitored the
target
- }
+ assertThatThrownBy(() -> timeoutFuture.get(2 * heartbeatTimeout,
TimeUnit.MILLISECONDS))
+ // the timeout should not be completed since we unmonitored
the target
+ .isInstanceOf(TimeoutException.class);
}
/** Tests that the last heartbeat from an unregistered target equals -1. */
@@ -337,7 +326,7 @@ public class HeartbeatManagerTest extends TestLogger {
LOG);
try {
- assertEquals(-1L,
heartbeatManager.getLastHeartbeatFrom(ResourceID.generate()));
+
assertThat(heartbeatManager.getLastHeartbeatFrom(ResourceID.generate())).isEqualTo(-1L);
} finally {
heartbeatManager.stop();
}
@@ -363,13 +352,14 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatManager.monitorTarget(
target, new
TestingHeartbeatTargetBuilder<>().createTestingHeartbeatTarget());
- assertEquals(0L, heartbeatManager.getLastHeartbeatFrom(target));
+ assertThat(heartbeatManager.getLastHeartbeatFrom(target)).isZero();
final long currentTime = System.currentTimeMillis();
heartbeatManager.receiveHeartbeat(target, null);
- assertTrue(heartbeatManager.getLastHeartbeatFrom(target) >=
currentTime);
+ assertThat(heartbeatManager.getLastHeartbeatFrom(target))
+ .isGreaterThanOrEqualTo(currentTime);
} finally {
heartbeatManager.stop();
}
@@ -429,10 +419,11 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatManager.monitorTarget(specialTargetId,
specialHeartbeatTarget);
heartbeatManager.requestHeartbeat(someTargetId, null);
- assertThat(someHeartbeatPayloadFuture.get(),
is(payloads.get(someTargetId)));
+
assertThat(someHeartbeatPayloadFuture.get()).isEqualTo(payloads.get(someTargetId));
heartbeatManager.requestHeartbeat(specialTargetId, null);
- assertThat(specialHeartbeatPayloadFuture.get(),
is(payloads.get(specialTargetId)));
+ assertThat(specialHeartbeatPayloadFuture.get())
+ .isEqualTo(payloads.get(specialTargetId));
} finally {
heartbeatManager.stop();
}
@@ -482,9 +473,10 @@ public class HeartbeatManagerTest extends TestLogger {
someTargetReceivedLatch.await(5, TimeUnit.SECONDS);
specialTargetReceivedLatch.await(5, TimeUnit.SECONDS);
- assertEquals(defaultResponse,
someHeartbeatTarget.getLastRequestedHeartbeatPayload());
- assertEquals(
- specialResponse,
specialHeartbeatTarget.getLastRequestedHeartbeatPayload());
+ assertThat(defaultResponse)
+
.isEqualTo(someHeartbeatTarget.getLastRequestedHeartbeatPayload());
+ assertThat(specialResponse)
+
.isEqualTo(specialHeartbeatTarget.getLastRequestedHeartbeatPayload());
} finally {
heartbeatManager.stop();
scheduledThreadPoolExecutor.shutdown();
@@ -568,9 +560,8 @@ public class HeartbeatManagerTest extends TestLogger {
for (int i = 0; i < failedRpcRequestsUntilUnreachable - 1; i++) {
heartbeatManager.requestHeartbeat(someTargetId, null);
- assertThat(
- unreachableTargetFuture,
- FlinkMatchers.willNotComplete(willNotCompleteWithin));
+ FlinkAssertions.assertThatFuture(unreachableTargetFuture)
+ .willNotCompleteWithin(willNotCompleteWithin);
}
heartbeatManager.requestHeartbeat(someTargetId, null);
@@ -619,8 +610,8 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatManager.requestHeartbeat(someTargetId, null);
}
- assertThat(
- unreachableTargetFuture,
FlinkMatchers.willNotComplete(willNotCompleteWithin));
+ FlinkAssertions.assertThatFuture(unreachableTargetFuture)
+ .willNotCompleteWithin(willNotCompleteWithin);
} finally {
heartbeatManager.stop();
}
@@ -668,8 +659,8 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatManager.requestHeartbeat(someTargetId, null);
}
- assertThat(
- unreachableTargetFuture,
FlinkMatchers.willNotComplete(willNotCompleteWithin));
+ FlinkAssertions.assertThatFuture(unreachableTargetFuture)
+ .willNotCompleteWithin(willNotCompleteWithin);
} finally {
heartbeatManager.stop();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java
index a3c25539b8d..5454e5e59db 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.testutils.FlinkMatchers;
import
org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
@@ -36,7 +35,6 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
-import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -55,13 +53,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Tests for {@link DefaultExecutionPlanStore} with {@link
TestingExecutionPlanStoreWatcher}, {@link
@@ -104,8 +97,8 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
final ExecutionPlan recoveredExecutionPlan =
executionPlanStore.recoverExecutionPlan(testingExecutionPlan.getJobID());
- assertThat(recoveredExecutionPlan, is(notNullValue()));
- assertThat(recoveredExecutionPlan.getJobID(),
is(testingExecutionPlan.getJobID()));
+ assertThat(recoveredExecutionPlan).isNotNull();
+
assertThat(recoveredExecutionPlan.getJobID()).isEqualTo(testingExecutionPlan.getJobID());
}
@Test
@@ -123,7 +116,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
final ExecutionPlan recoveredExecutionPlan =
executionPlanStore.recoverExecutionPlan(testingExecutionPlan.getJobID());
- assertThat(recoveredExecutionPlan, is(nullValue()));
+ assertThat(recoveredExecutionPlan).isNull();
}
@Test
@@ -141,15 +134,13 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
final ExecutionPlanStore executionPlanStore =
createAndStartExecutionPlanStore(stateHandleStore);
- try {
-
executionPlanStore.recoverExecutionPlan(testingExecutionPlan.getJobID());
- fail(
- "recoverExecutionPlan should fail when there is exception
in getting the state handle.");
- } catch (Exception ex) {
- assertThat(ex, FlinkMatchers.containsCause(testException));
- String actual = releaseFuture.get(timeout, TimeUnit.MILLISECONDS);
- assertThat(actual, is(testingExecutionPlan.getJobID().toString()));
- }
+ assertThatThrownBy(
+ () ->
+ executionPlanStore.recoverExecutionPlan(
+ testingExecutionPlan.getJobID()))
+ .hasCause(testException);
+ String actual = releaseFuture.get(timeout, TimeUnit.MILLISECONDS);
+ assertThat(testingExecutionPlan.getJobID()).hasToString(actual);
}
@Test
@@ -169,7 +160,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
executionPlanStore.putExecutionPlan(testingExecutionPlan);
final ExecutionPlan actual = addFuture.get(timeout,
TimeUnit.MILLISECONDS);
- assertThat(actual.getJobID(), is(testingExecutionPlan.getJobID()));
+
assertThat(actual.getJobID()).isEqualTo(testingExecutionPlan.getJobID());
}
@Test
@@ -200,9 +191,9 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
final Tuple3<String, IntegerResourceVersion, ExecutionPlan> actual =
replaceFuture.get(timeout, TimeUnit.MILLISECONDS);
- assertThat(actual.f0, is(testingExecutionPlan.getJobID().toString()));
- assertThat(actual.f1,
is(IntegerResourceVersion.valueOf(resourceVersion)));
- assertThat(actual.f2.getJobID(), is(testingExecutionPlan.getJobID()));
+
assertThat(actual.f0).isEqualTo(testingExecutionPlan.getJobID().toString());
+
assertThat(actual.f1).isEqualTo(IntegerResourceVersion.valueOf(resourceVersion));
+
assertThat(actual.f2.getJobID()).isEqualTo(testingExecutionPlan.getJobID());
}
@Test
@@ -221,7 +212,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
.globalCleanupAsync(testingExecutionPlan.getJobID(),
Executors.directExecutor())
.join();
final JobID actual = removeFuture.get(timeout, TimeUnit.MILLISECONDS);
- assertThat(actual, is(testingExecutionPlan.getJobID()));
+ assertThat(actual).isEqualTo(testingExecutionPlan.getJobID());
}
@Test
@@ -237,7 +228,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
.globalCleanupAsync(testingExecutionPlan.getJobID(),
Executors.directExecutor())
.join();
- assertThat(removeFuture.isDone(), is(true));
+ assertThat(removeFuture).isDone();
}
@Test
@@ -247,13 +238,14 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
final ExecutionPlanStore executionPlanStore =
createAndStartExecutionPlanStore(stateHandleStore);
- assertThrows(
- ExecutionException.class,
- () ->
- executionPlanStore
- .globalCleanupAsync(
- testingExecutionPlan.getJobID(),
Executors.directExecutor())
- .get());
+ assertThatThrownBy(
+ () ->
+ executionPlanStore
+ .globalCleanupAsync(
+
testingExecutionPlan.getJobID(),
+ Executors.directExecutor())
+ .get())
+ .isInstanceOf(ExecutionException.class);
}
@Test
@@ -270,7 +262,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
final ExecutionPlanStore executionPlanStore =
createAndStartExecutionPlanStore(stateHandleStore);
final Collection<JobID> jobIds = executionPlanStore.getJobIds();
- assertThat(jobIds, contains(existingJobIds.toArray()));
+ assertThat(jobIds).containsAll(existingJobIds);
}
@Test
@@ -283,7 +275,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
executionPlanStore.putExecutionPlan(testingExecutionPlan);
testingExecutionPlanStoreWatcher.addExecutionPlan(testingExecutionPlan.getJobID());
-
assertThat(testingExecutionPlanListener.getAddedExecutionPlans().size(), is(0));
+
assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).isEmpty();
}
@Test
@@ -303,8 +295,8 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
// Unknown job
final JobID unknownJobId = JobID.generate();
testingExecutionPlanStoreWatcher.addExecutionPlan(unknownJobId);
-
assertThat(testingExecutionPlanListener.getAddedExecutionPlans().size(), is(1));
- assertThat(testingExecutionPlanListener.getAddedExecutionPlans(),
contains(unknownJobId));
+
assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).hasSize(1);
+
assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).contains(unknownJobId);
}
@Test
@@ -320,10 +312,9 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
testingExecutionPlanStoreWatcher.removeExecutionPlan(JobID.generate());
// Known job
testingExecutionPlanStoreWatcher.removeExecutionPlan(testingExecutionPlan.getJobID());
-
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans().size(),
is(1));
- assertThat(
- testingExecutionPlanListener.getRemovedExecutionPlans(),
- contains(testingExecutionPlan.getJobID()));
+
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans()).hasSize(1);
+ assertThat(testingExecutionPlanListener.getRemovedExecutionPlans())
+ .contains(testingExecutionPlan.getJobID());
}
@Test
@@ -334,7 +325,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
createAndStartExecutionPlanStore(stateHandleStore);
testingExecutionPlanStoreWatcher.removeExecutionPlan(testingExecutionPlan.getJobID());
-
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans().size(),
is(0));
+
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans()).isEmpty();
}
@Test
@@ -347,7 +338,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
executionPlanStore.stop();
testingExecutionPlanStoreWatcher.addExecutionPlan(testingExecutionPlan.getJobID());
-
assertThat(testingExecutionPlanListener.getAddedExecutionPlans().size(), is(0));
+
assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).isEmpty();
}
@Test
@@ -361,7 +352,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
executionPlanStore.stop();
testingExecutionPlanStoreWatcher.removeExecutionPlan(testingExecutionPlan.getJobID());
-
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans().size(),
is(0));
+
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans()).isEmpty();
}
@Test
@@ -374,7 +365,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
createAndStartExecutionPlanStore(stateHandleStore);
executionPlanStore.stop();
- assertThat(completableFuture.isDone(), is(true));
+ assertThat(completableFuture).isDone();
}
@Test
@@ -390,7 +381,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
.join();
final String actual = releaseFuture.get();
- assertThat(actual, is(testingExecutionPlan.getJobID().toString()));
+ assertThat(testingExecutionPlan.getJobID()).hasToString(actual);
}
@Test
@@ -450,7 +441,7 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
JobResourceRequirements.readFromExecutionPlan(
Objects.requireNonNull(
(JobGraph)
executionPlanStore.recoverExecutionPlan(jobId)));
- Assertions.assertThat(maybeRecovered).get().isEqualTo(expected);
+ assertThat(maybeRecovered).get().isEqualTo(expected);
}
@Test
@@ -463,11 +454,11 @@ public class DefaultExecutionPlanStoreTest extends
TestLogger {
.build();
final ExecutionPlanStore executionPlanStore =
createAndStartExecutionPlanStore(stateHandleStore);
- assertThrows(
- NoSuchElementException.class,
- () ->
- executionPlanStore.putJobResourceRequirements(
- new JobID(), JobResourceRequirements.empty()));
+ assertThatThrownBy(
+ () ->
+ executionPlanStore.putJobResourceRequirements(
+ new JobID(),
JobResourceRequirements.empty()))
+ .isInstanceOf(NoSuchElementException.class);
}
private ExecutionPlanStore createAndStartExecutionPlanStore(
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index dea19ebb948..9746ea9bafb 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -44,13 +44,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
import static
org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.assertj.core.api.HamcrestCondition.matching;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Tests for {@link CatalogTable} to {@link ResolvedCatalogTable}, {@link
CatalogMaterializedTable}
@@ -252,41 +250,30 @@ class CatalogBaseTableResolutionTest {
@Test
void testPropertyDeserializationError() {
- try {
- final Map<String, String> properties = catalogTableAsProperties();
- properties.remove("schema.4.data-type");
- CatalogTable.fromProperties(properties);
- fail("unknown failure");
- } catch (Exception e) {
- assertThat(e)
- .satisfies(
- matching(
- containsMessage(
- "Could not find property key
'schema.4.data-type'.")));
- }
+ assertThatThrownBy(
+ () -> {
+ final Map<String, String> properties =
catalogTableAsProperties();
+ properties.remove("schema.4.data-type");
+ CatalogTable.fromProperties(properties);
+ })
+ .hasRootCauseMessage("Could not find property key
'schema.4.data-type'.");
}
@Test
void testInvalidPartitionKeys() {
final CatalogTable catalogTable =
- CatalogTable.of(
- TABLE_SCHEMA,
- null,
- Arrays.asList("region", "countyINVALID"),
- Collections.emptyMap());
-
- try {
- resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable);
- fail("Invalid partition keys expected.");
- } catch (Exception e) {
- assertThat(e)
- .satisfies(
- matching(
- containsMessage(
- "Invalid partition key
'countyINVALID'. A partition key must "
- + "reference a physical
column in the schema. Available "
- + "columns are: [id,
region, county]")));
- }
+ CatalogTable.newBuilder()
+ .schema(TABLE_SCHEMA)
+ .comment(null)
+ .partitionKeys(Arrays.asList("region",
"countyINVALID"))
+ .options(Collections.emptyMap())
+ .build();
+
+ assertThatThrownBy(() ->
resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable))
+ .hasRootCauseMessage(
+ "Invalid partition key 'countyINVALID'. A partition
key must "
+ + "reference a physical column in the schema.
Available "
+ + "columns are: [id, region, county]");
}
@Test
@@ -317,18 +304,11 @@ class CatalogBaseTableResolutionTest {
Collections.emptyMap(),
null,
TableDistribution.ofHash(Collections.singletonList("countyINVALID"), 6));
- try {
- resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable);
- fail("Invalid bucket keys expected.");
- } catch (Exception e) {
- assertThat(e)
- .satisfies(
- matching(
- containsMessage(
- "Invalid bucket key
'countyINVALID'. A bucket key for a distribution must "
- + "reference a physical
column in the schema. "
- + "Available columns are:
[id, region, county]")));
- }
+ assertThatThrownBy(() ->
resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable))
+ .hasRootCauseMessage(
+ "Invalid bucket key 'countyINVALID'. A bucket key for
a distribution must "
+ + "reference a physical column in the schema. "
+ + "Available columns are: [id, region,
county]");
}
@Test
@@ -342,17 +322,10 @@ class CatalogBaseTableResolutionTest {
null,
TableDistribution.ofHash(Collections.singletonList("id"), 0));
- try {
- resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable);
- fail("Invalid bucket keys expected.");
- } catch (Exception e) {
- assertThat(e)
- .satisfies(
- matching(
- containsMessage(
- "Invalid bucket count '0'. The
number of buckets for a "
- + "distributed table must
be at least 1.")));
- }
+ assertThatThrownBy(() ->
resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable))
+ .hasRootCauseMessage(
+ "Invalid bucket count '0'. The number of buckets for a
"
+ + "distributed table must be at least 1.");
}
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
index 75f6fce9d3b..f69bc791c07 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.catalog;
-import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.expressions.CallExpression;
@@ -47,8 +46,7 @@ import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRow
import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.assertj.core.api.HamcrestCondition.matching;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link Schema}, {@link DefaultSchemaResolver}, and {@link
ResolvedSchema}. */
class SchemaResolutionTest {
@@ -434,12 +432,8 @@ class SchemaResolutionTest {
}
private static void testError(Schema schema, String errorMessage, boolean
isStreaming) {
- try {
- resolveSchema(schema, isStreaming);
- fail("Error message expected: " + errorMessage);
- } catch (Throwable t) {
-
assertThat(t).satisfies(matching(FlinkMatchers.containsMessage(errorMessage)));
- }
+ assertThatThrownBy(() -> resolveSchema(schema, isStreaming))
+ .hasMessageContaining(errorMessage);
}
private static ResolvedSchema resolveSchema(Schema schema) {
diff --git
a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala
b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala
index 79880f76142..0115ad2285a 100644
---
a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala
+++
b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala
@@ -22,7 +22,6 @@ import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.types.extraction.DataTypeExtractorTest._
import org.assertj.core.api.Assertions.assertThatThrownBy
-import org.assertj.core.api.HamcrestCondition
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -37,7 +36,7 @@ class DataTypeExtractorScalaTest {
def testScalaExtraction(testSpec: DataTypeExtractorTest.TestSpec): Unit = {
if (testSpec.hasErrorMessage) {
assertThatThrownBy(() => runExtraction(testSpec))
- .is(HamcrestCondition.matching(errorMatcher(testSpec)))
+ .hasRootCauseMessage(testSpec.expectedErrorMessage)
.isInstanceOf[ValidationException]
} else {
runExtraction(testSpec)
diff --git
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
index 22b7b6d5c64..f6a389b5a43 100644
---
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
+++
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.table.codesplit;
-import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.util.FileUtils;
import org.junit.jupiter.api.Disabled;
@@ -28,7 +27,6 @@ import java.io.File;
import static org.apache.flink.table.codesplit.CodeSplitTestUtil.trimLines;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.HamcrestCondition.matching;
/** Tests for {@link JavaCodeSplitter}. */
class JavaCodeSplitterTest {
@@ -46,15 +44,12 @@ class JavaCodeSplitterTest {
@Test
@Disabled("Disabled in because of
https://issues.apache.org/jira/browse/FLINK-27702")
void testInvalidJavaCode() {
- try {
- JavaCodeSplitter.split("public class InvalidClass { return 1; }",
4000, 10000);
- } catch (Exception e) {
- assertThat(e)
- .satisfies(
- matching(
- FlinkMatchers.containsMessage(
- "JavaCodeSplitter failed. This is
a bug. Please file an issue.")));
- }
+ assertThatThrownBy(
+ () ->
+ JavaCodeSplitter.split(
+ "public class InvalidClass { return 1;
}", 4000, 10000))
+ .hasMessageContaining(
+ "JavaCodeSplitter failed. This is a bug. Please file
an issue.");
}
@Test
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
index 21eed2f0277..5c27ae62a3e 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
@@ -44,7 +44,6 @@ import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.DataTypeFactoryMock;
import org.apache.flink.types.Row;
-import org.hamcrest.Matcher;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@@ -64,7 +63,6 @@ import java.util.function.Function;
import java.util.stream.Stream;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.apache.flink.table.test.TableAssertions.assertThat;
import static org.apache.flink.table.types.utils.DataTypeFactoryMock.dummyRaw;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -506,7 +504,7 @@ class DataTypeExtractorTest {
private @Nullable DataType expectedDataType;
- private @Nullable String expectedErrorMessage;
+ @Nullable String expectedErrorMessage;
private TestSpec(
@Nullable String description, Function<DataTypeFactory,
DataType> extractor) {
@@ -626,10 +624,6 @@ class DataTypeExtractorTest {
}
}
- static Matcher<Throwable> errorMatcher(TestSpec testSpec) {
- return containsCause(new
ValidationException(testSpec.expectedErrorMessage));
- }
-
/** Testing data type shared with the Scala tests. */
static DataType getSimplePojoDataType(Class<?> simplePojoClass) {
final StructuredType.Builder builder =
StructuredType.newBuilder(simplePojoClass);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
index 49e158a34f4..6f677a34a3e 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.codegen;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.binary.BinaryRowData;
@@ -57,7 +56,6 @@ import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
-import static org.assertj.core.api.HamcrestCondition.matching;
/** Tests for code generations with code splitting. */
class CodeSplitTest {
@@ -256,7 +254,7 @@ class CodeSplitTest {
consumer.accept(noSplitTableConfig);
fail("Expecting compiler exception");
} catch (Exception e) {
-
assertThat(e).satisfies(matching(FlinkMatchers.containsMessage("grows beyond 64
KB")));
+ assertThat(e).hasRootCauseMessage("Code grows beyond 64 KB");
} finally {
// set stdout back
System.setOut(originalStdOut);
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java
index 44f030ddedf..f2df69a0989 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java
@@ -34,10 +34,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.assertj.core.api.HamcrestCondition.matching;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Utilities for testing {@link SliceAssigner}s. */
abstract class SliceAssignerTestBase {
@@ -51,12 +49,7 @@ abstract class SliceAssignerTestBase {
}
protected static void assertErrorMessage(Runnable runnable, String
errorMessage) {
- try {
- runnable.run();
- fail("should fail.");
- } catch (Exception e) {
- assertThat(e).satisfies(matching(containsMessage(errorMessage)));
- }
+ assertThatThrownBy(runnable::run).hasMessageContaining(errorMessage);
}
protected static long assignSliceEnd(SliceAssigner assigner, long
timestamp) {
diff --git
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java
index 8a8bb1cdebf..76abcbc43c5 100644
---
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java
+++
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java
@@ -77,6 +77,23 @@ public class FlinkCompletableFutureAssert<T>
(ThrowableAssertAlternative<T>) throwableAssert;
return cast;
}
+
+ /**
+ * Checks that the underlying throwable has cause of the given type
and returns a {@link
+ * ThrowableAssertAlternative} to chain further assertions on the
underlying throwable.
+ *
+ * @param cause the expected {@link Throwable} cause
+ * @param <T> the expected {@link Throwable} cause
+ * @return a {@link ThrowableAssertAlternative} built with underlying
throwable.
+ */
+ public <T extends Throwable> ThrowableAssertAlternative<T>
withCauseOfType(Class<T> cause) {
+ final ThrowableAssertAlternative<Throwable> throwableAssert =
+ new
ThrowableAssertAlternative<>(throwable).withCauseInstanceOf(cause);
+ @SuppressWarnings("unchecked")
+ final ThrowableAssertAlternative<T> cast =
+ (ThrowableAssertAlternative<T>) throwableAssert;
+ return cast;
+ }
}
FlinkCompletableFutureAssert(CompletableFuture<T> actual) {
@@ -121,8 +138,6 @@ public class FlinkCompletableFutureAssert<T>
/**
* Assert that {@link CompletableFuture} will not complete within a fixed
duration.
*
- * <p>This is a replacement for {@link
FlinkMatchers#willNotComplete(Duration)} in assertj.
- *
* @return {@code this} assertion object.
*/
public FlinkCompletableFutureAssert<T> willNotCompleteWithin(Duration
duration) {
diff --git
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
deleted file mode 100644
index 2eaa65ae04a..00000000000
---
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.testutils;
-
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeDiagnosingMatcher;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.time.Duration;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
-import java.util.function.Predicate;
-
-/**
- * Some reusable hamcrest matchers for Flink.
- *
- * @deprecated You should assertj assertions, which have built-in assertions
for {@link
- * CompletableFuture}. To check chains of {@link Throwable} causes, use
{@link
- * FlinkAssertions#anyCauseMatches(String)} or {@link
FlinkAssertions#anyCauseMatches(Class,
- * String)}
- */
-@Deprecated
-public class FlinkMatchers {
-
- // ------------------------------------------------------------------------
- // factories
- // ------------------------------------------------------------------------
-
- /**
- * Checks whether {@link CompletableFuture} completed already
exceptionally with a specific
- * exception type.
- */
- public static <T, E extends Throwable> FutureFailedMatcher<T>
futureFailedWith(
- Class<E> exceptionType) {
- Objects.requireNonNull(exceptionType, "exceptionType should not be
null");
- return new FutureFailedMatcher<>(exceptionType);
- }
-
- /**
- * Checks whether {@link CompletableFuture} will completed exceptionally
within a certain time.
- */
- public static <T, E extends Throwable> FutureWillFailMatcher<T>
futureWillCompleteExceptionally(
- Class<E> exceptionType, Duration timeout) {
- Objects.requireNonNull(exceptionType, "exceptionType should not be
null");
- Objects.requireNonNull(timeout, "timeout should not be null");
- return new FutureWillFailMatcher<>(exceptionType, timeout);
- }
-
- /**
- * Checks whether {@link CompletableFuture} will completed exceptionally
within a certain time.
- */
- public static <T> FutureWillFailMatcher<T> futureWillCompleteExceptionally(
- Function<Throwable, Boolean> exceptionCheck,
- Duration timeout,
- String checkDescription) {
- Objects.requireNonNull(exceptionCheck, "exceptionType should not be
null");
- Objects.requireNonNull(timeout, "timeout should not be null");
- return new FutureWillFailMatcher<>(exceptionCheck, timeout,
checkDescription);
- }
-
- /**
- * Checks whether {@link CompletableFuture} will completed exceptionally
within a certain time.
- */
- public static <T> FutureWillFailMatcher<T>
futureWillCompleteExceptionally(Duration timeout) {
- return futureWillCompleteExceptionally(Throwable.class, timeout);
- }
-
- /** Checks for a {@link Throwable} that matches by class. */
- public static Matcher<Throwable> containsCause(Class<? extends Throwable>
failureCause) {
- return new ContainsCauseMatcher(failureCause);
- }
-
- /** Checks for a {@link Throwable} that matches by class and message. */
- public static Matcher<Throwable> containsCause(Throwable failureCause) {
- return new ContainsCauseAndMessageMatcher(failureCause);
- }
-
- /** Checks for a {@link Throwable} that contains the expected error
message. */
- public static Matcher<Throwable> containsMessage(String errorMessage) {
- return new ContainsMessageMatcher(errorMessage);
- }
-
- /** Checks that a {@link CompletableFuture} won't complete within the
given timeout. */
- public static Matcher<CompletableFuture<?>> willNotComplete(Duration
timeout) {
- return new WillNotCompleteMatcher(timeout);
- }
-
- // ------------------------------------------------------------------------
-
- /** This class should not be instantiated. */
- private FlinkMatchers() {}
-
- // ------------------------------------------------------------------------
- // matcher implementations
- // ------------------------------------------------------------------------
-
- private static final class FutureFailedMatcher<T>
- extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> {
-
- private final Class<? extends Throwable> expectedException;
-
- FutureFailedMatcher(Class<? extends Throwable> expectedException) {
- super(CompletableFuture.class);
- this.expectedException = expectedException;
- }
-
- @Override
- protected boolean matchesSafely(
- CompletableFuture<T> future, Description mismatchDescription) {
- if (!future.isDone()) {
- mismatchDescription.appendText("Future is not completed.");
- return false;
- }
-
- if (!future.isCompletedExceptionally()) {
- Object result = future.getNow(null);
- assert result != null;
- mismatchDescription.appendText(
- "Future did not complete exceptionally, but instead
regularly with: "
- + result);
- return false;
- }
-
- try {
- future.getNow(null);
- throw new Error();
- } catch (CompletionException e) {
- if (e.getCause() != null
- &&
expectedException.isAssignableFrom(e.getCause().getClass())) {
- return true;
- }
-
- mismatchDescription.appendText(
- "Future completed with different exception: " +
e.getCause());
- return false;
- }
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText(
- "A CompletableFuture that failed with: " +
expectedException.getName());
- }
- }
-
- private static final class FutureWillFailMatcher<T>
- extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> {
-
- private final Function<Throwable, Boolean> exceptionValidator;
-
- private final Duration timeout;
-
- private final String validationDescription;
-
- FutureWillFailMatcher(Class<? extends Throwable> expectedException,
Duration timeout) {
-
- super(CompletableFuture.class);
- this.exceptionValidator = (e) ->
expectedException.isAssignableFrom(e.getClass());
- this.timeout = timeout;
- this.validationDescription = expectedException.getName();
- }
-
- FutureWillFailMatcher(
- Function<Throwable, Boolean> exceptionValidator,
- Duration timeout,
- String validationDescription) {
-
- super(CompletableFuture.class);
- this.exceptionValidator = exceptionValidator;
- this.timeout = timeout;
- this.validationDescription = validationDescription;
- }
-
- @Override
- protected boolean matchesSafely(
- CompletableFuture<T> future, Description mismatchDescription) {
- try {
- final Object result = future.get(timeout.toMillis(),
TimeUnit.MILLISECONDS);
- mismatchDescription.appendText(
- "Future did not complete exceptionally, but instead
regularly with: "
- + result);
- return false;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new Error("interrupted test");
- } catch (TimeoutException e) {
- mismatchDescription.appendText(
- "Future did not complete withing " +
timeout.toMillis() + " milliseconds.");
- return false;
- } catch (ExecutionException e) {
- final Throwable cause = e.getCause();
- if (cause != null && exceptionValidator.apply(cause)) {
- return true;
- }
-
- String otherDescription = "(null)";
- if (cause != null) {
- final StringWriter stm = new StringWriter();
- try (PrintWriter wrt = new PrintWriter(stm)) {
- cause.printStackTrace(wrt);
- }
- otherDescription = stm.toString();
- }
-
- mismatchDescription.appendText(
- "Future completed with different exception: " +
otherDescription);
- return false;
- }
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText(
- "A CompletableFuture that will have failed within "
- + timeout.toMillis()
- + " milliseconds with: "
- + validationDescription);
- }
- }
-
- private static final class ContainsCauseMatcher extends
TypeSafeDiagnosingMatcher<Throwable> {
-
- private final Class<? extends Throwable> failureCause;
-
- private ContainsCauseMatcher(Class<? extends Throwable> failureCause) {
- this.failureCause = failureCause;
- }
-
- @Override
- protected boolean matchesSafely(Throwable throwable, Description
description) {
- final Optional<Throwable> optionalCause =
- findThrowable(throwable, cause -> cause.getClass() ==
failureCause);
-
- if (!optionalCause.isPresent()) {
- description
- .appendText("The throwable ")
- .appendValue(throwable)
- .appendText(" does not contain the expected failure
cause ")
- .appendValue(failureCause.getSimpleName());
- }
-
- return optionalCause.isPresent();
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("Expected failure cause is ")
- .appendValue(failureCause.getSimpleName());
- }
- }
-
- private static final class ContainsCauseAndMessageMatcher
- extends TypeSafeDiagnosingMatcher<Throwable> {
-
- private final Throwable failureCause;
-
- private ContainsCauseAndMessageMatcher(Throwable failureCause) {
- this.failureCause = failureCause;
- }
-
- @Override
- protected boolean matchesSafely(Throwable throwable, Description
description) {
- final Optional<Throwable> optionalCause =
- findThrowable(
- throwable,
- cause ->
- cause.getClass() == failureCause.getClass()
- && cause.getMessage()
-
.equals(failureCause.getMessage()));
-
- if (!optionalCause.isPresent()) {
- description
- .appendText("The throwable ")
- .appendValue(throwable)
- .appendText(" does not contain the expected failure
cause ")
- .appendValue(failureCause);
- }
-
- return optionalCause.isPresent();
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("Expected failure cause is
").appendValue(failureCause);
- }
- }
-
- private static final class ContainsMessageMatcher extends
TypeSafeDiagnosingMatcher<Throwable> {
-
- private final String errorMessage;
-
- private ContainsMessageMatcher(String errorMessage) {
- this.errorMessage = errorMessage;
- }
-
- @Override
- protected boolean matchesSafely(Throwable throwable, Description
description) {
- final Optional<Throwable> optionalCause =
- findThrowable(throwable, this::containsErrorMessage);
-
- if (!optionalCause.isPresent()) {
- description
- .appendText("The throwable ")
- .appendValue(throwable)
- .appendText(" does not contain the expected error
message ")
- .appendValue(errorMessage);
- }
-
- return optionalCause.isPresent();
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("Expected error message is
").appendValue(errorMessage);
- }
-
- private boolean containsErrorMessage(Throwable t) {
- return t.getMessage() != null &&
t.getMessage().contains(errorMessage);
- }
- }
-
- // copied from flink-core to not mess up the dependency design too much,
just for a little
- // utility method
- private static Optional<Throwable> findThrowable(
- Throwable throwable, Predicate<Throwable> predicate) {
- if (throwable == null || predicate == null) {
- return Optional.empty();
- }
-
- Throwable t = throwable;
- while (t != null) {
- if (predicate.test(t)) {
- return Optional.of(t);
- } else {
- t = t.getCause();
- }
- }
-
- return Optional.empty();
- }
-
- private static final class WillNotCompleteMatcher
- extends TypeSafeDiagnosingMatcher<CompletableFuture<?>> {
-
- private final Duration timeout;
-
- private WillNotCompleteMatcher(Duration timeout) {
- this.timeout = timeout;
- }
-
- @Override
- protected boolean matchesSafely(
- CompletableFuture<?> item, Description mismatchDescription) {
-
- try {
- final Object value = item.get(timeout.toMillis(),
TimeUnit.MILLISECONDS);
- mismatchDescription
- .appendText("The given future completed with ")
- .appendValue(value);
- } catch (TimeoutException timeoutException) {
- return true;
- } catch (InterruptedException e) {
- mismatchDescription.appendText("The waiting thread was
interrupted.");
- } catch (ExecutionException e) {
- mismatchDescription
- .appendText("The given future was completed
exceptionally: ")
- .appendValue(e);
- }
-
- return false;
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("The given future should not complete within ")
- .appendValue(timeout.toMillis())
- .appendText(" ms.");
- }
- }
-}
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java
index 9e5638ccc4e..965d1ebef6b 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java
@@ -36,11 +36,11 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.util.TestLogger;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Integration tests for the {@link JobMaster}. */
public class JobMasterITCase extends TestLogger {
@@ -58,9 +58,9 @@ public class JobMasterITCase extends TestLogger {
try {
miniCluster.getMiniCluster().submitJob(jobGraph).get();
- fail("Expect failure");
+ Assertions.fail("Expect failure");
} catch (Throwable t) {
- assertThat(t, containsMessage("The given job is empty"));
+ assertThat(t).hasRootCauseMessage("The given job is empty");
} finally {
miniCluster.after();
}
@@ -85,11 +85,7 @@ public class JobMasterITCase extends TestLogger {
see.fromSource(mySource, WatermarkStrategy.noWatermarks(),
"MySourceName");
stream.sinkTo(new DiscardingSink<>());
- try {
- see.execute();
- } catch (Exception e) {
- assertThat(e, containsMessage("Context was not yet initialized"));
- }
+ assertThatThrownBy(see::execute).hasRootCauseMessage("Context was not
yet initialized");
}
private static class FailOnInitializationSource implements Source<String,
MockSplit, Void> {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
index f15cb467032..e2136169728 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
@@ -58,12 +58,12 @@ import
org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
import
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
-import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@@ -84,15 +84,9 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.util.ExceptionUtils.assertThrowable;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.either;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
/** Integration tests for the adaptive scheduler. */
public class AdaptiveSchedulerITCase extends TestLogger {
@@ -124,7 +118,7 @@ public class AdaptiveSchedulerITCase extends TestLogger {
@Before
public void ensureAdaptiveSchedulerEnabled() {
- assumeTrue(ClusterOptions.isAdaptiveSchedulerEnabled(configuration));
+
assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
}
@After
@@ -172,8 +166,8 @@ public class AdaptiveSchedulerITCase extends TestLogger {
savepointDirectory.getAbsolutePath(),
SavepointFormatType.CANONICAL)
.get();
- assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
- assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+ assertThat(savepoint).contains(savepointDirectory.getAbsolutePath());
+ assertThat(client.getJobStatus().get()).isSameAs(JobStatus.FINISHED);
}
@Test
@@ -187,16 +181,14 @@ public class AdaptiveSchedulerITCase extends TestLogger {
JobClient client = env.executeAsync();
DummySource.awaitRunning();
- try {
- client.stopWithSavepoint(
- false,
-
tempFolder.newFolder("savepoint").getAbsolutePath(),
- SavepointFormatType.CANONICAL)
- .get();
- fail("Expect exception");
- } catch (ExecutionException e) {
- assertThat(e, containsCause(FlinkException.class));
- }
+ assertThatThrownBy(
+ () ->
+ client.stopWithSavepoint(
+ false,
+
tempFolder.newFolder("savepoint").getAbsolutePath(),
+ SavepointFormatType.CANONICAL)
+ .get())
+ .hasCauseInstanceOf(FlinkException.class);
// expect job to run again (maybe restart)
CommonTestUtils.waitUntilCondition(() -> client.getJobStatus().get()
== JobStatus.RUNNING);
}
@@ -217,18 +209,18 @@ public class AdaptiveSchedulerITCase extends TestLogger {
false,
tempFolder.newFolder("savepoint").getAbsolutePath(),
SavepointFormatType.CANONICAL);
- final Throwable savepointException =
- assertThrows(ExecutionException.class,
savepointCompleted::get).getCause();
- assertThrowable(
- savepointException,
- throwable ->
- throwable instanceof StopWithSavepointStoppingException
- && throwable
- .getMessage()
- .startsWith("A savepoint has been
created at: "));
- assertThat(
- client.getJobStatus().get(),
- either(is(JobStatus.FAILED)).or(is(JobStatus.FAILING)));
+ assertThatThrownBy(savepointCompleted::get)
+ .isInstanceOf(ExecutionException.class)
+ .satisfies(
+ e ->
+ assertThat(
+ ExceptionUtils.findThrowable(
+ e,
+
StopWithSavepointStoppingException
+ .class)
+ .get())
+ .hasMessageContaining("A savepoint has
been created at: "));
+ assertThat(client.getJobStatus().get()).isIn(JobStatus.FAILED,
JobStatus.FAILING);
}
@Test
@@ -248,16 +240,14 @@ public class AdaptiveSchedulerITCase extends TestLogger {
DummySource.awaitRunning();
DummySource.resetForParallelism(PARALLELISM);
final File savepointDirectory = tempFolder.newFolder("savepoint");
- try {
- client.stopWithSavepoint(
- false,
- savepointDirectory.getAbsolutePath(),
- SavepointFormatType.CANONICAL)
- .get();
- fail("Expect failure of operation");
- } catch (ExecutionException e) {
- assertThat(e, containsCause(FlinkException.class));
- }
+ assertThatThrownBy(
+ () ->
+ client.stopWithSavepoint(
+ false,
+
savepointDirectory.getAbsolutePath(),
+ SavepointFormatType.CANONICAL)
+ .get())
+ .hasCauseInstanceOf(FlinkException.class);
DummySource.awaitRunning();
@@ -273,7 +263,7 @@ public class AdaptiveSchedulerITCase extends TestLogger {
savepointDirectory.getAbsolutePath(),
SavepointFormatType.CANONICAL)
.get();
- assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
+ assertThat(savepoint).contains(savepointDirectory.getAbsolutePath());
}
@Test
@@ -335,22 +325,20 @@ public class AdaptiveSchedulerITCase extends TestLogger {
// there should be exactly 1 root exception in the history from the
failing vertex,
// as the global coordinator failure should be treated as a concurrent
exception
- Assertions.assertThat(jobExceptions.getExceptionHistory().getEntries())
+ assertThat(jobExceptions.getExceptionHistory().getEntries())
.hasSize(1)
.allSatisfy(
rootExceptionInfo ->
-
Assertions.assertThat(rootExceptionInfo.getStacktrace())
+ assertThat(rootExceptionInfo.getStacktrace())
.contains(FailingInvokable.localExceptionMsg)
.doesNotContain(
FailingCoordinatorProvider.globalExceptionMsg))
.allSatisfy(
rootExceptionInfo ->
-
Assertions.assertThat(rootExceptionInfo.getConcurrentExceptions())
+
assertThat(rootExceptionInfo.getConcurrentExceptions())
.anySatisfy(
exceptionInfo ->
- Assertions.assertThat(
-
exceptionInfo
-
.getStacktrace())
+
assertThat(exceptionInfo.getStacktrace())
.contains(
FailingCoordinatorProvider
.globalExceptionMsg)));