This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bc9a99fa061 [FLINK-36948][tests] Remove deprecated FlinkMatchers
bc9a99fa061 is described below

commit bc9a99fa061e4cd2f003742442c7151bfa141c30
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Dec 26 20:33:55 2024 +0100

    [FLINK-36948][tests] Remove deprecated FlinkMatchers
---
 .../DispatcherCachedOperationsHandlerTest.java     |  66 ++--
 .../dispatcher/DispatcherCleanupITCase.java        |  86 ++---
 .../dispatcher/DispatcherResourceCleanupTest.java  |  25 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 223 +++++------
 .../DispatcherResourceManagerComponentTest.java    |   9 +-
 .../runtime/heartbeat/HeartbeatManagerTest.java    |  77 ++--
 .../jobmanager/DefaultExecutionPlanStoreTest.java  |  97 +++--
 .../catalog/CatalogBaseTableResolutionTest.java    |  85 ++---
 .../flink/table/catalog/SchemaResolutionTest.java  |  12 +-
 .../extraction/DataTypeExtractorScalaTest.scala    |   3 +-
 .../table/codesplit/JavaCodeSplitterTest.java      |  17 +-
 .../types/extraction/DataTypeExtractorTest.java    |   8 +-
 .../flink/table/planner/codegen/CodeSplitTest.java |   4 +-
 .../window/tvf/slicing/SliceAssignerTestBase.java  |  11 +-
 .../testutils/FlinkCompletableFutureAssert.java    |  19 +-
 .../apache/flink/core/testutils/FlinkMatchers.java | 406 ---------------------
 .../flink/runtime/jobmaster/JobMasterITCase.java   |  16 +-
 .../test/scheduling/AdaptiveSchedulerITCase.java   |  92 ++---
 18 files changed, 359 insertions(+), 897 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
index 7768e36724c..889f44d846d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
 import org.apache.flink.runtime.rest.handler.async.OperationResult;
@@ -41,11 +41,8 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link DispatcherCachedOperationsHandler} component. */
 public class DispatcherCachedOperationsHandlerTest extends TestLogger {
@@ -124,18 +121,17 @@ public class DispatcherCachedOperationsHandlerTest 
extends TestLogger {
                         TriggerSavepointMode.SAVEPOINT,
                         TIMEOUT);
 
-        assertThat(triggerSavepointFunction.getNumberOfInvocations(), is(1));
-        assertThat(
-                triggerSavepointFunction.getInvocationParameters().get(0),
-                is(
+        assertThat(triggerSavepointFunction.getNumberOfInvocations()).isOne();
+        assertThat(triggerSavepointFunction.getInvocationParameters().get(0))
+                .isEqualTo(
                         new Tuple4<>(
                                 jobID,
                                 targetDirectory,
                                 SavepointFormatType.CANONICAL,
-                                TriggerSavepointMode.SAVEPOINT)));
+                                TriggerSavepointMode.SAVEPOINT));
 
-        assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
-        assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+        assertThat(firstAcknowledge.get()).isEqualTo(Acknowledge.get());
+        assertThat(secondAcknowledge.get()).isEqualTo(Acknowledge.get());
     }
 
     @Test
@@ -155,18 +151,17 @@ public class DispatcherCachedOperationsHandlerTest 
extends TestLogger {
                         TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT,
                         TIMEOUT);
 
-        assertThat(stopWithSavepointFunction.getNumberOfInvocations(), is(1));
-        assertThat(
-                stopWithSavepointFunction.getInvocationParameters().get(0),
-                is(
+        assertThat(stopWithSavepointFunction.getNumberOfInvocations()).isOne();
+        assertThat(stopWithSavepointFunction.getInvocationParameters().get(0))
+                .isEqualTo(
                         new Tuple4<>(
                                 jobID,
                                 targetDirectory,
                                 SavepointFormatType.CANONICAL,
-                                
TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT)));
+                                
TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT));
 
-        assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
-        assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+        assertThat(firstAcknowledge.get()).isEqualTo(Acknowledge.get());
+        assertThat(secondAcknowledge.get()).isEqualTo(Acknowledge.get());
     }
 
     @Test
@@ -190,23 +185,22 @@ public class DispatcherCachedOperationsHandlerTest 
extends TestLogger {
                 .get();
 
         // should not complete because we wait for the result to be accessed
-        assertThat(
-                savepointTriggerCache.closeAsync(),
-                FlinkMatchers.willNotComplete(Duration.ofMillis(10)));
+        FlinkAssertions.assertThatFuture(savepointTriggerCache.closeAsync())
+                .willNotCompleteWithin(Duration.ofMillis(10));
     }
 
     @Test
     public void throwsIfCacheIsShuttingDown() {
         savepointTriggerCache.closeAsync();
-        assertThrows(
-                IllegalStateException.class,
-                () ->
-                        handler.triggerSavepoint(
-                                operationKey,
-                                targetDirectory,
-                                SavepointFormatType.CANONICAL,
-                                TriggerSavepointMode.SAVEPOINT,
-                                TIMEOUT));
+        assertThatThrownBy(
+                        () ->
+                                handler.triggerSavepoint(
+                                        operationKey,
+                                        targetDirectory,
+                                        SavepointFormatType.CANONICAL,
+                                        TriggerSavepointMode.SAVEPOINT,
+                                        TIMEOUT))
+                .isInstanceOf(IllegalStateException.class);
     }
 
     @Test
@@ -224,15 +218,17 @@ public class DispatcherCachedOperationsHandlerTest 
extends TestLogger {
         CompletableFuture<OperationResult<String>> statusFuture =
                 handler.getSavepointStatus(operationKey);
 
-        assertEquals(statusFuture.get(), 
OperationResult.success(savepointLocation));
+        
assertThat(statusFuture.get()).isEqualTo(OperationResult.success(savepointLocation));
     }
 
     @Test
-    public void getStatusFailsIfKeyUnknown() throws InterruptedException {
+    public void getStatusFailsIfKeyUnknown() {
         CompletableFuture<OperationResult<String>> statusFuture =
                 handler.getSavepointStatus(operationKey);
 
-        assertThat(statusFuture, 
futureFailedWith(UnknownOperationKeyException.class));
+        FlinkAssertions.assertThatFuture(statusFuture)
+                .eventuallyFails()
+                .withCauseOfType(UnknownOperationKeyException.class);
     }
 
     private abstract static class TriggerCheckpointSpyFunction
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index 92725c4b09d..d07a38156d8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.CleanupOptions;
 import org.apache.flink.core.execution.RecoveryClaimMode;
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
@@ -53,10 +52,7 @@ import 
org.apache.flink.runtime.testutils.TestingExecutionPlanStore;
 import org.apache.flink.runtime.testutils.TestingJobResultStore;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.collection.IsEmptyCollection;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -65,16 +61,14 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** An integration test for various fail-over scenarios of the {@link 
Dispatcher} component. */
 public class DispatcherCleanupITCase extends AbstractDispatcherTest {
@@ -95,8 +89,8 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
                                 // First job cleanup still succeeded for the
                                 // CompletedCheckpointStore because the 
JobGraph cleanup happens
                                 // after the JobManagerRunner closing
-                                
assertTrue(previous.getShutdownStatus().isPresent());
-                                
assertTrue(previous.getAllCheckpoints().isEmpty());
+                                
assertThat(previous.getShutdownStatus()).isPresent();
+                                
assertThat(previous.getAllCheckpoints()).isEmpty();
                                 return new EmbeddedCompletedCheckpointStore(
                                         maxCheckpoints,
                                         previous.getAllCheckpoints(),
@@ -191,12 +185,11 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
 
         successfulCleanupLatch.await();
 
-        assertThat(actualGlobalCleanupCallCount.get(), equalTo(numberOfErrors 
+ 1));
+        
assertThat(actualGlobalCleanupCallCount.get()).isEqualTo(numberOfErrors + 1);
 
-        assertThat(
-                "The JobGraph should be removed from ExecutionPlanStore.",
-                haServices.getExecutionPlanStore().getJobIds(),
-                IsEmptyCollection.empty());
+        assertThat(haServices.getExecutionPlanStore().getJobIds())
+                .as("The JobGraph should be removed from ExecutionPlanStore.")
+                .isEmpty();
 
         CommonTestUtils.waitUntilCondition(
                 () -> 
haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get());
@@ -233,20 +226,15 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
 
         CommonTestUtils.waitUntilCondition(() -> jobManagerRunnerEntry.get() 
!= null);
 
-        assertThat(
-                "The JobResultStore should have this job still marked as 
dirty.",
-                
haServices.getJobResultStore().hasDirtyJobResultEntryAsync(jobId).get(),
-                CoreMatchers.is(true));
+        
assertThat(haServices.getJobResultStore().hasDirtyJobResultEntryAsync(jobId).get())
+                .as("The JobResultStore should have this job still marked as 
dirty.")
+                .isTrue();
 
         final DispatcherGateway dispatcherGateway =
                 dispatcher.getSelfGateway(DispatcherGateway.class);
 
-        try {
-            dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
-            Assert.fail("Should fail because cancelling the cleanup is not 
allowed.");
-        } catch (ExecutionException e) {
-            assertThat(e, 
FlinkMatchers.containsCause(JobCancellationFailedException.class));
-        }
+        assertThatThrownBy(() -> dispatcherGateway.cancelJob(jobId, 
TIMEOUT).get())
+                .hasCauseInstanceOf(JobCancellationFailedException.class);
         jobManagerRunnerCleanupFuture.complete(null);
 
         CommonTestUtils.waitUntilCondition(
@@ -305,25 +293,22 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
         waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway, 
jobId);
         firstCleanupTriggered.await();
 
-        assertThat(
-                "The cleanup should have been triggered only once.",
-                actualGlobalCleanupCallCount.get(),
-                equalTo(1));
-        assertThat(
-                "The cleanup should not have reached the successful cleanup 
code path.",
-                successfulJobGraphCleanup.isDone(),
-                equalTo(false));
+        assertThat(actualGlobalCleanupCallCount.get())
+                .as("The cleanup should have been triggered only once.")
+                .isOne();
+        assertThat(successfulJobGraphCleanup.isDone())
+                .as("The cleanup should not have reached the successful 
cleanup code path.")
+                .isFalse();
 
+        assertThat(haServices.getExecutionPlanStore().getJobIds())
+                .as("The JobGraph is still stored in the ExecutionPlanStore.")
+                .containsExactly(jobId);
         assertThat(
-                "The JobGraph is still stored in the ExecutionPlanStore.",
-                haServices.getExecutionPlanStore().getJobIds(),
-                equalTo(Collections.singleton(jobId)));
-        assertThat(
-                "The JobResultStore should have this job marked as dirty.",
-                haServices.getJobResultStore().getDirtyResults().stream()
-                        .map(JobResult::getJobId)
-                        .collect(Collectors.toSet()),
-                equalTo(Collections.singleton(jobId)));
+                        
haServices.getJobResultStore().getDirtyResults().stream()
+                                .map(JobResult::getJobId)
+                                .collect(Collectors.toSet()))
+                .as("The JobResultStore should have this job marked as dirty.")
+                .containsExactly(jobId);
 
         // Run a second dispatcher, that restores our finished job.
         final Dispatcher secondDispatcher =
@@ -338,17 +323,16 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
         CommonTestUtils.waitUntilCondition(
                 () -> 
haServices.getJobResultStore().getDirtyResults().isEmpty());
 
-        assertThat(
-                "The JobGraph is not stored in the ExecutionPlanStore.",
-                haServices.getExecutionPlanStore().getJobIds(),
-                IsEmptyCollection.empty());
-        assertTrue(
-                "The JobResultStore has the job listed as clean.",
-                
haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get());
+        assertThat(haServices.getExecutionPlanStore().getJobIds())
+                .as("The JobGraph is not stored in the ExecutionPlanStore.")
+                .isEmpty();
+        
assertThat(haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get())
+                .as("The JobResultStore has the job listed as clean.")
+                .isTrue();
 
-        assertThat(successfulJobGraphCleanup.get(), equalTo(jobId));
+        assertThat(successfulJobGraphCleanup.get()).isEqualTo(jobId);
 
-        assertThat(actualGlobalCleanupCallCount.get(), equalTo(2));
+        assertThat(actualGlobalCleanupCallCount.get()).isEqualTo(2);
     }
 
     private void waitForJobToFinish(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index cae270a84ed..09c3be44bae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -84,9 +84,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
 import static 
org.apache.flink.runtime.dispatcher.AbstractDispatcherTest.awaitStatus;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -272,12 +271,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
         startDispatcher(new FailingJobManagerRunnerFactory(new 
FlinkException("Test exception")));
         final CompletableFuture<Acknowledge> submissionFuture = submitJob();
 
-        try {
-            submissionFuture.get();
-            fail("Job submission was expected to fail.");
-        } catch (ExecutionException ee) {
-            assertThat(ee, containsCause(JobSubmissionException.class));
-        }
+        
assertThatThrownBy(submissionFuture::get).hasCauseInstanceOf(JobSubmissionException.class);
 
         assertGlobalCleanupTriggered(jobId);
     }
@@ -647,15 +641,12 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
 
         // submit and fail during job master runner construction
         queue.offer(Optional.of(testException));
-        try {
-            dispatcherGateway.submitJob(jobGraph, Duration.ofMinutes(1)).get();
-            fail("A FlinkException is expected");
-        } catch (Throwable expectedException) {
-            assertThat(expectedException, containsCause(FlinkException.class));
-            assertThat(expectedException, 
containsMessage(testException.getMessage()));
-            // make sure we've cleaned up in correct order (including HA)
-            assertGlobalCleanupTriggered(jobId);
-        }
+        assertThatThrownBy(() -> dispatcherGateway.submitJob(jobGraph, 
Duration.ofMinutes(1)).get())
+                .hasCauseInstanceOf(FlinkException.class)
+                .hasRootCauseMessage(testException.getMessage());
+
+        // make sure we've cleaned up in correct order (including HA)
+        assertGlobalCleanupTriggered(jobId);
 
         // don't fail this time
         queue.offer(Optional.empty());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index dbb8d652b28..a1d44b3712e 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.testutils.FlinkAssertions;
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
@@ -107,7 +106,6 @@ import 
org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import org.assertj.core.api.Assertions;
-import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -144,16 +142,8 @@ import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for the {@link Dispatcher} component. */
 public class DispatcherTest extends AbstractDispatcherTest {
@@ -219,9 +209,9 @@ public class DispatcherTest extends AbstractDispatcherTest {
 
         jobMasterLeaderElection.getStartFuture().get();
 
-        assertTrue(
-                "jobManagerRunner was not started",
-                jobMasterLeaderElection.getStartFuture().isDone());
+        assertThat(jobMasterLeaderElection.getStartFuture())
+                .as("jobManagerRunner was not started")
+                .isDone();
     }
 
     @Test
@@ -290,11 +280,11 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 .eventuallySucceeds();
 
         // verify that all but one submission failed as duplicates
-        Assertions.assertThat(exceptions)
+        assertThat(exceptions)
                 .hasSize(numThreads - 1)
                 .allSatisfy(
                         t ->
-                                Assertions.assertThat(t)
+                                assertThat(t)
                                         
.hasCauseInstanceOf(DuplicateJobSubmissionException.class));
     }
 
@@ -308,12 +298,14 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 dispatcher.getSelfGateway(DispatcherGateway.class);
         final CompletableFuture<Acknowledge> submitFuture =
                 dispatcherGateway.submitJob(jobGraph, TIMEOUT);
-        final ExecutionException executionException =
-                assertThrows(ExecutionException.class, submitFuture::get);
-        assertTrue(executionException.getCause() instanceof 
DuplicateJobSubmissionException);
-        final DuplicateJobSubmissionException duplicateException =
-                (DuplicateJobSubmissionException) 
executionException.getCause();
-        assertTrue(duplicateException.isGloballyTerminated());
+        assertThatThrownBy(submitFuture::get)
+                .hasCauseInstanceOf(DuplicateJobSubmissionException.class)
+                .satisfies(
+                        e ->
+                                assertThat(
+                                                
((DuplicateJobSubmissionException) e.getCause())
+                                                        
.isGloballyTerminated())
+                                        .isTrue());
     }
 
     @Test
@@ -328,12 +320,14 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 dispatcher.getSelfGateway(DispatcherGateway.class);
         final CompletableFuture<Acknowledge> submitFuture =
                 dispatcherGateway.submitJob(jobGraph, TIMEOUT);
-        final ExecutionException executionException =
-                assertThrows(ExecutionException.class, submitFuture::get);
-        assertTrue(executionException.getCause() instanceof 
DuplicateJobSubmissionException);
-        final DuplicateJobSubmissionException duplicateException =
-                (DuplicateJobSubmissionException) 
executionException.getCause();
-        assertFalse(duplicateException.isGloballyTerminated());
+        assertThatThrownBy(submitFuture::get)
+                .hasCauseInstanceOf(DuplicateJobSubmissionException.class)
+                .satisfies(
+                        e ->
+                                assertThat(
+                                                
((DuplicateJobSubmissionException) e.getCause())
+                                                        
.isGloballyTerminated())
+                                        .isFalse());
     }
 
     /**
@@ -364,12 +358,8 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
         CompletableFuture<Acknowledge> acknowledgeFuture =
                 dispatcherGateway.submitJob(jobGraphWithTwoVertices, TIMEOUT);
 
-        try {
-            acknowledgeFuture.get();
-            fail("job submission should have failed");
-        } catch (ExecutionException e) {
-            assertTrue(ExceptionUtils.findThrowable(e, 
JobSubmissionException.class).isPresent());
-        }
+        assertThatThrownBy(() -> acknowledgeFuture.get())
+                .hasCauseInstanceOf(JobSubmissionException.class);
     }
 
     @Test
@@ -384,15 +374,14 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         blockingJobMaster.waitForBlockingInit();
 
         // ensure INITIALIZING status
-        assertThat(
-                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
-                is(JobStatus.INITIALIZING));
+        assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())
+                .isSameAs(JobStatus.INITIALIZING);
 
         // ensure correct JobDetails
         MultipleJobsDetails multiDetails =
                 dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
-        assertEquals(1, multiDetails.getJobs().size());
-        assertEquals(jobId, 
multiDetails.getJobs().iterator().next().getJobId());
+        assertThat(multiDetails.getJobs()).hasSize(1);
+        
assertThat(multiDetails.getJobs().iterator().next().getJobId()).isEqualTo(jobId);
 
         // let the initialization finish.
         blockingJobMaster.unblockJobMasterInitialization();
@@ -410,24 +399,21 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
 
         dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
-        assertThat(
-                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
-                is(JobStatus.INITIALIZING));
+        assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())
+                .isSameAs(JobStatus.INITIALIZING);
 
         // this call is supposed to fail
-        try {
-            dispatcherGateway
-                    .triggerSavepointAndGetLocation(
-                            jobId,
-                            "file:///tmp/savepoint",
-                            SavepointFormatType.CANONICAL,
-                            TriggerSavepointMode.SAVEPOINT,
-                            TIMEOUT)
-                    .get();
-            fail("Previous statement should have failed");
-        } catch (ExecutionException t) {
-            assertTrue(t.getCause() instanceof 
UnavailableDispatcherOperationException);
-        }
+        assertThatThrownBy(
+                        () ->
+                                dispatcherGateway
+                                        .triggerSavepointAndGetLocation(
+                                                jobId,
+                                                "file:///tmp/savepoint",
+                                                SavepointFormatType.CANONICAL,
+                                                TriggerSavepointMode.SAVEPOINT,
+                                                TIMEOUT)
+                                        .get())
+                
.hasCauseInstanceOf(UnavailableDispatcherOperationException.class);
     }
 
     @Test
@@ -490,8 +476,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
         // wait for job to finish
         dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
         // sanity check
-        assertThat(
-                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), 
is(JobStatus.CANCELED));
+        assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())
+                .isSameAs(JobStatus.CANCELED);
 
         dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
     }
@@ -524,16 +510,15 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         // wait for job to finish
         dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
         // sanity check
-        assertThat(
-                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), 
is(JobStatus.FINISHED));
+        assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())
+                .isSameAs(JobStatus.FINISHED);
 
         final CompletableFuture<Acknowledge> cancelFuture =
                 dispatcherGateway.cancelJob(jobId, TIMEOUT);
 
-        assertThat(
-                cancelFuture,
-                FlinkMatchers.futureWillCompleteExceptionally(
-                        FlinkJobTerminatedWithoutCancellationException.class, 
Duration.ofHours(8)));
+        FlinkAssertions.assertThatFuture(cancelFuture)
+                .eventuallyFails()
+                
.withCauseOfType(FlinkJobTerminatedWithoutCancellationException.class);
     }
 
     @Test
@@ -570,10 +555,10 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         // wait for job to finish
         dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
         // sanity check
-        assertThat(
-                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), 
is(JobStatus.SUSPENDED));
+        assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())
+                .isSameAs(JobStatus.SUSPENDED);
 
-        assertThat(archiveAttemptFuture.isDone(), is(false));
+        assertThat(archiveAttemptFuture).isNotDone();
     }
 
     @Test
@@ -615,7 +600,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
         // get failure cause
         ArchivedExecutionGraph execGraph =
                 dispatcherGateway.requestJob(jobGraph.getJobID(), 
TIMEOUT).get();
-        assertThat(execGraph.getState(), is(JobStatus.FAILED));
+        assertThat(execGraph.getState()).isSameAs(JobStatus.FAILED);
 
         Assert.assertNotNull(execGraph.getFailureInfo());
         Throwable throwable =
@@ -625,7 +610,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
                         .deserializeError(ClassLoader.getSystemClassLoader());
 
         // ensure correct exception type
-        assertThat(throwable.getMessage(), equalTo(testFailure.getMessage()));
+        assertThat(throwable).hasMessage(testFailure.getMessage());
     }
 
     /** Test that {@link JobResult} is cached when the job finishes. */
@@ -654,13 +639,12 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
 
         dispatcher.completeJobExecution(failedExecutionGraphInfo);
 
-        assertThat(
-                dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(),
-                equalTo(expectedState));
+        assertThat(dispatcherGateway.requestJobStatus(failedJobId, 
TIMEOUT).get())
+                .isEqualTo(expectedState);
         final CompletableFuture<ExecutionGraphInfo> 
completableFutureCompletableFuture =
                 dispatcher.callAsyncInMainThread(
                         () -> 
dispatcher.requestExecutionGraphInfo(failedJobId, TIMEOUT));
-        assertThat(completableFutureCompletableFuture.get(), 
is(failedExecutionGraphInfo));
+        
assertThat(completableFutureCompletableFuture.get()).isEqualTo(failedExecutionGraphInfo);
     }
 
     @Test
@@ -685,8 +669,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
         CompletableFuture<CheckpointStatsSnapshot> resultsFuture =
                 dispatcher.callAsyncInMainThread(
                         () -> dispatcher.requestCheckpointStats(jobId, 
TIMEOUT));
-        
Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1));
-        Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot);
+        assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1));
+        assertThat(resultsFuture).isCompletedWithValue(snapshot);
     }
 
     @Test
@@ -734,8 +718,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
         CompletableFuture<CheckpointStatsSnapshot> resultsFuture =
                 dispatcher.callAsyncInMainThread(
                         () -> dispatcher.requestCheckpointStats(jobId, 
TIMEOUT));
-        
Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1));
-        Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot);
+        assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1));
+        assertThat(resultsFuture).isCompletedWithValue(snapshot);
     }
 
     private CheckpointStatsSnapshot 
getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints() {
@@ -763,8 +747,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
                 dispatcher.callAsyncInMainThread(
                         () -> dispatcher.requestCheckpointStats(jobId, 
TIMEOUT));
 
-        
Assertions.assertThat(resultsFuture).failsWithin(Duration.ofSeconds(1));
-        Assertions.assertThat(resultsFuture).isCompletedExceptionally();
+        assertThat(resultsFuture).failsWithin(Duration.ofSeconds(1));
+        assertThat(resultsFuture).isCompletedExceptionally();
 
         Assertions.assertThatThrownBy(resultsFuture::get)
                 .hasCauseInstanceOf(FlinkJobNotFoundException.class)
@@ -781,12 +765,8 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
 
         final DispatcherGateway dispatcherGateway =
                 dispatcher.getSelfGateway(DispatcherGateway.class);
-        try {
-            dispatcherGateway.requestJob(new JobID(), TIMEOUT).get();
-        } catch (ExecutionException e) {
-            final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
-            assertThat(throwable, instanceOf(FlinkJobNotFoundException.class));
-        }
+        assertThatThrownBy(() -> dispatcherGateway.requestJob(new JobID(), 
TIMEOUT).get())
+                .hasCauseInstanceOf(FlinkJobNotFoundException.class);
     }
 
     /** Tests that we can dispose a savepoint. */
@@ -804,11 +784,11 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         final DispatcherGateway dispatcherGateway =
                 dispatcher.getSelfGateway(DispatcherGateway.class);
 
-        assertThat(Files.exists(savepointPath), is(true));
+        assertThat(Files.exists(savepointPath)).isTrue();
 
         dispatcherGateway.disposeSavepoint(externalPointer.toString(), 
TIMEOUT).get();
 
-        assertThat(Files.exists(savepointPath), is(false));
+        assertThat(Files.exists(savepointPath)).isFalse();
     }
 
     @Nonnull
@@ -892,10 +872,8 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
         final Throwable error =
                 fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMillis(), 
TimeUnit.MILLISECONDS);
 
-        assertThat(
-                ExceptionUtils.findThrowableWithMessage(error, 
testException.getMessage())
-                        .isPresent(),
-                is(true));
+        assertThat(ExceptionUtils.findThrowableWithMessage(error, 
testException.getMessage()))
+                .isPresent();
 
         fatalErrorHandler.clearError();
     }
@@ -946,15 +924,13 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
 
         final TestingJobManagerRunner cleanupRunner =
                 cleanupRunnerFactory.takeCreatedJobManagerRunner();
-        assertThat(
-                "The CleanupJobManagerRunner has the wrong job ID attached.",
-                cleanupRunner.getJobID(),
-                is(jobIdOfRecoveredDirtyJobs));
+        assertThat(cleanupRunner.getJobID())
+                .as("The CleanupJobManagerRunner has the wrong job ID 
attached.")
+                .isEqualTo(jobIdOfRecoveredDirtyJobs);
 
-        assertThat(
-                "No JobMaster should have been started.",
-                jobManagerRunnerFactory.getQueueSize(),
-                is(0));
+        assertThat(jobManagerRunnerFactory.getQueueSize())
+                .as("No JobMaster should have been started.")
+                .isZero();
     }
 
     @Test
@@ -975,11 +951,11 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 dispatcher.getSelfGateway(DispatcherGateway.class);
         dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
-        assertThat(dispatcher.getNumberJobs(TIMEOUT).get(), Matchers.is(1));
+        assertThat(dispatcher.getNumberJobs(TIMEOUT).get()).isOne();
 
         dispatcher.close();
 
-        assertThat(submittedExecutionPlanStore.contains(jobGraph.getJobID()), 
Matchers.is(true));
+        
assertThat(submittedExecutionPlanStore.contains(jobGraph.getJobID())).isTrue();
     }
 
     /** Tests that a submitted job is suspended if the Dispatcher is 
terminated. */
@@ -998,12 +974,12 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         final CompletableFuture<JobResult> jobResultFuture =
                 dispatcherGateway.requestJobResult(jobGraph.getJobID(), 
TIMEOUT);
 
-        assertThat(jobResultFuture.isDone(), is(false));
+        assertThat(jobResultFuture).isNotDone();
 
         dispatcher.close();
 
         final JobResult jobResult = jobResultFuture.get();
-        assertEquals(jobResult.getApplicationStatus(), 
ApplicationStatus.UNKNOWN);
+        
assertThat(jobResult.getApplicationStatus()).isSameAs(ApplicationStatus.UNKNOWN);
     }
 
     @Test
@@ -1093,13 +1069,10 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
 
         processFuture.join();
 
-        assertThat(releaseJobGraphFuture.get(), is(jobGraph.getJobID()));
+        assertThat(releaseJobGraphFuture.get()).isEqualTo(jobGraph.getJobID());
 
-        try {
-            removeJobGraphFuture.get(10L, TimeUnit.MILLISECONDS);
-            fail("onRemovedExecutionPlan should not remove the job from the 
ExecutionPlanStore.");
-        } catch (TimeoutException expected) {
-        }
+        assertThatThrownBy(() -> removeJobGraphFuture.get(10L, 
TimeUnit.MILLISECONDS))
+                .isInstanceOf(TimeoutException.class);
     }
 
     @Test
@@ -1119,7 +1092,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         final long initializationTimestamp = 
initializationTimestampQueue.take();
 
         // ensure all statuses are set in the ExecutionGraph
-        assertThat(initializationTimestamp, greaterThan(0L));
+        assertThat(initializationTimestamp).isGreaterThan(0L);
     }
 
     @Test
@@ -1286,8 +1259,8 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
     private static void assertOnlyContainsSingleJobWithState(
             final JobStatus expectedJobStatus, final MultipleJobsDetails 
multipleJobsDetails) {
         final Collection<JobDetails> finishedJobDetails = 
multipleJobsDetails.getJobs();
-        assertEquals(1, finishedJobDetails.size());
-        assertEquals(expectedJobStatus, 
finishedJobDetails.iterator().next().getStatus());
+        assertThat(finishedJobDetails).hasSize(1);
+        
assertThat(finishedJobDetails.iterator().next().getStatus()).isEqualTo(expectedJobStatus);
     }
 
     @Test
@@ -1304,7 +1277,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                         .setRecoveredJobs(Collections.singleton(new 
JobGraph(jobId1, "foobar")))
                         .build(rpcService);
 
-        Assertions.assertThat(blobServer.getFile(jobId1, 
blobKey1)).hasBinaryContent(fileContent);
+        assertThat(blobServer.getFile(jobId1, 
blobKey1)).hasBinaryContent(fileContent);
         Assertions.assertThatThrownBy(() -> blobServer.getFile(jobId2, 
blobKey2))
                 .isInstanceOf(NoSuchFileException.class);
     }
@@ -1326,16 +1299,16 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         submitFuture.get();
         final ArchivedExecutionGraph archivedExecutionGraph =
                 dispatcherGateway.requestJob(failedJobId, TIMEOUT).get();
-        
Assertions.assertThat(archivedExecutionGraph.getJobID()).isEqualTo(failedJobId);
-        
Assertions.assertThat(archivedExecutionGraph.getJobName()).isEqualTo(failedJobName);
-        
Assertions.assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
-        Assertions.assertThat(archivedExecutionGraph.getFailureInfo())
+        assertThat(archivedExecutionGraph.getJobID()).isEqualTo(failedJobId);
+        
assertThat(archivedExecutionGraph.getJobName()).isEqualTo(failedJobName);
+        
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
+        assertThat(archivedExecutionGraph.getFailureInfo())
                 .isNotNull()
                 .extracting(ErrorInfo::getException)
                 .extracting(e -> 
e.deserializeError(Thread.currentThread().getContextClassLoader()))
                 .satisfies(
                         exception ->
-                                Assertions.assertThat(exception)
+                                assertThat(exception)
                                         .isInstanceOf(RuntimeException.class)
                                         .hasMessage("Test exception."));
     }
@@ -1430,8 +1403,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 .isInstanceOf(RestHandlerException.class)
                 .satisfies(
                         e ->
-                                Assertions.assertThat(
-                                                ((RestHandlerException) 
e).getHttpResponseStatus())
+                                assertThat(((RestHandlerException) 
e).getHttpResponseStatus())
                                         
.isSameAs(HttpResponseStatus.INTERNAL_SERVER_ERROR));
 
         // verify that persist errors prevents the requirement from being 
applied
@@ -1509,8 +1481,8 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 testConcurrentModificationIsPrevented(
                         dispatcherGateway, blockingJobMaster, secondJobGraph);
 
-        Assertions.assertThat(firstPendingUpdateFuture).isNotCompleted();
-        Assertions.assertThat(secondPendingUpdateFuture).isNotCompleted();
+        assertThat(firstPendingUpdateFuture).isNotCompleted();
+        assertThat(secondPendingUpdateFuture).isNotCompleted();
         blockedUpdatesToJobMasterFuture.complete(Acknowledge.get());
         assertThatFuture(firstPendingUpdateFuture).eventuallySucceeds();
         assertThatFuture(secondPendingUpdateFuture).eventuallySucceeds();
@@ -1541,8 +1513,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 .isInstanceOf(RestHandlerException.class)
                 .satisfies(
                         e ->
-                                Assertions.assertThat(
-                                                ((RestHandlerException) 
e).getHttpResponseStatus())
+                                assertThat(((RestHandlerException) 
e).getHttpResponseStatus())
                                         
.isSameAs(HttpResponseStatus.CONFLICT));
         assertThatFuture(pendingUpdateFuture).isNotCompleted();
 
@@ -1634,7 +1605,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 Collection<FailureEnricher> failureEnricher,
                 long initializationTimestamp)
                 throws Exception {
-            assertEquals(expectedJobId, graph.getJobID());
+            assertThat(graph.getJobID()).isEqualTo(expectedJobId);
             final JobMasterGateway jobMasterGateway =
                     new TestingJobMasterGatewayBuilder()
                             .setRequestJobSupplier(
@@ -1943,7 +1914,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 Collection<FailureEnricher> failureEnrichers,
                 long initializationTimestamp)
                 throws Exception {
-            assertEquals(expectedJobId, graph.getJobID());
+            assertThat(graph.getJobID()).isEqualTo(expectedJobId);
 
             return 
JobMasterServiceLeadershipRunnerFactory.INSTANCE.createJobManagerRunner(
                     graph,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java
index 21ed3e30d50..116a25d07c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.entrypoint.component;
 
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches;
 import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
@@ -36,9 +37,7 @@ import javax.annotation.Nullable;
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DispatcherResourceManagerComponent}. */
 public class DispatcherResourceManagerComponentTest extends TestLogger {
@@ -59,7 +58,7 @@ public class DispatcherResourceManagerComponentTest extends 
TestLogger {
         terminationFuture.completeExceptionally(expectedException);
 
         final Throwable error = fatalErrorHandler.getException();
-        assertThat(error, containsCause(expectedException));
+        assertThat(error).hasCause(expectedException);
     }
 
     private DispatcherResourceManagerComponent 
createDispatcherResourceManagerComponent(
@@ -94,7 +93,7 @@ public class DispatcherResourceManagerComponentTest extends 
TestLogger {
         terminationFuture.completeExceptionally(expectedException);
 
         final CompletableFuture<Throwable> errorFuture = 
fatalErrorHandler.getErrorFuture();
-        assertThat(errorFuture, willNotComplete(Duration.ofMillis(10L)));
+        
FlinkAssertions.assertThatFuture(errorFuture).willNotCompleteWithin(Duration.ofMillis(10L));
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index 955338f9f3e..a52c1501ccd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.heartbeat;
 
-import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
@@ -29,7 +29,6 @@ import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
-import org.hamcrest.Matcher;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -51,14 +50,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link HeartbeatManager}. */
 public class HeartbeatManagerTest extends TestLogger {
@@ -116,12 +109,12 @@ public class HeartbeatManagerTest extends TestLogger {
         final String inputPayload1 = "foobar";
         heartbeatManager.requestHeartbeat(targetResourceID, inputPayload1);
 
-        assertThat(reportedPayloads.take(), is(inputPayload1));
-        assertThat(reportedPayloadsHeartbeatTarget.take(), is(outputPayload));
+        assertThat(reportedPayloads.take()).isEqualTo(inputPayload1);
+        
assertThat(reportedPayloadsHeartbeatTarget.take()).isEqualTo(outputPayload);
 
         final String inputPayload2 = "barfoo";
         heartbeatManager.receiveHeartbeat(targetResourceID, inputPayload2);
-        assertThat(reportedPayloads.take(), is(inputPayload2));
+        assertThat(reportedPayloads.take()).isEqualTo(inputPayload2);
     }
 
     /** Tests that the heartbeat monitors are updated when receiving a new 
heartbeat signal. */
@@ -157,9 +150,9 @@ public class HeartbeatManagerTest extends TestLogger {
         final List<ScheduledFuture<?>> scheduledTasksAfterHeartbeat =
                 manuallyTriggeredScheduledExecutor.getAllScheduledTasks();
 
-        assertThat(scheduledTasksAfterHeartbeat, hasSize(2));
+        assertThat(scheduledTasksAfterHeartbeat).hasSize(2);
         // the first scheduled future should be cancelled by the heartbeat 
update
-        assertTrue(scheduledTasksAfterHeartbeat.get(0).isCancelled());
+        assertThat(scheduledTasksAfterHeartbeat.get(0).isCancelled()).isTrue();
     }
 
     /** Tests that a heartbeat timeout is signaled if the heartbeat is not 
reported in time. */
@@ -197,12 +190,12 @@ public class HeartbeatManagerTest extends TestLogger {
             Thread.sleep(HEARTBEAT_INTERVAL);
         }
 
-        assertFalse(timeoutFuture.isDone());
+        FlinkAssertions.assertThatFuture(timeoutFuture).eventuallySucceeds();
 
         ResourceID timeoutResourceID =
                 timeoutFuture.get(2 * HEARTBEAT_TIMEOUT, 
TimeUnit.MILLISECONDS);
 
-        assertEquals(targetResourceID, timeoutResourceID);
+        assertThat(targetResourceID).isEqualTo(timeoutResourceID);
     }
 
     /**
@@ -264,20 +257,19 @@ public class HeartbeatManagerTest extends TestLogger {
 
         Thread.sleep(2 * HEARTBEAT_TIMEOUT);
 
-        assertFalse(targetHeartbeatTimeoutFuture.isDone());
+        assertThat(targetHeartbeatTimeoutFuture).isNotDone();
 
         heartbeatManagerTarget.stop();
 
         ResourceID timeoutResourceID =
                 targetHeartbeatTimeoutFuture.get(2 * HEARTBEAT_TIMEOUT, 
TimeUnit.MILLISECONDS);
 
-        assertThat(timeoutResourceID, is(resourceIdTarget));
+        assertThat(timeoutResourceID).isEqualTo(resourceIdTarget);
 
         int numberHeartbeats = (int) (2 * HEARTBEAT_TIMEOUT / 
HEARTBEAT_INTERVAL);
 
-        final Matcher<Integer> numberHeartbeatsMatcher = 
greaterThanOrEqualTo(numberHeartbeats / 2);
-        assertThat(numReportPayloadCallsTarget.get(), 
is(numberHeartbeatsMatcher));
-        assertThat(numReportPayloadCallsSender.get(), 
is(numberHeartbeatsMatcher));
+        
assertThat(numReportPayloadCallsTarget.get()).isGreaterThanOrEqualTo(numberHeartbeats
 / 2);
+        
assertThat(numReportPayloadCallsSender.get()).isGreaterThanOrEqualTo(numberHeartbeats
 / 2);
     }
 
     /** Tests that after unmonitoring a target, there won't be a timeout 
triggered. */
@@ -311,12 +303,9 @@ public class HeartbeatManagerTest extends TestLogger {
 
         heartbeatManager.unmonitorTarget(targetID);
 
-        try {
-            timeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
-            fail("Timeout should time out.");
-        } catch (TimeoutException ignored) {
-            // the timeout should not be completed since we unmonitored the 
target
-        }
+        assertThatThrownBy(() -> timeoutFuture.get(2 * heartbeatTimeout, 
TimeUnit.MILLISECONDS))
+                // the timeout should not be completed since we unmonitored 
the target
+                .isInstanceOf(TimeoutException.class);
     }
 
     /** Tests that the last heartbeat from an unregistered target equals -1. */
@@ -337,7 +326,7 @@ public class HeartbeatManagerTest extends TestLogger {
                         LOG);
 
         try {
-            assertEquals(-1L, 
heartbeatManager.getLastHeartbeatFrom(ResourceID.generate()));
+            
assertThat(heartbeatManager.getLastHeartbeatFrom(ResourceID.generate())).isEqualTo(-1L);
         } finally {
             heartbeatManager.stop();
         }
@@ -363,13 +352,14 @@ public class HeartbeatManagerTest extends TestLogger {
             heartbeatManager.monitorTarget(
                     target, new 
TestingHeartbeatTargetBuilder<>().createTestingHeartbeatTarget());
 
-            assertEquals(0L, heartbeatManager.getLastHeartbeatFrom(target));
+            assertThat(heartbeatManager.getLastHeartbeatFrom(target)).isZero();
 
             final long currentTime = System.currentTimeMillis();
 
             heartbeatManager.receiveHeartbeat(target, null);
 
-            assertTrue(heartbeatManager.getLastHeartbeatFrom(target) >= 
currentTime);
+            assertThat(heartbeatManager.getLastHeartbeatFrom(target))
+                    .isGreaterThanOrEqualTo(currentTime);
         } finally {
             heartbeatManager.stop();
         }
@@ -429,10 +419,11 @@ public class HeartbeatManagerTest extends TestLogger {
             heartbeatManager.monitorTarget(specialTargetId, 
specialHeartbeatTarget);
 
             heartbeatManager.requestHeartbeat(someTargetId, null);
-            assertThat(someHeartbeatPayloadFuture.get(), 
is(payloads.get(someTargetId)));
+            
assertThat(someHeartbeatPayloadFuture.get()).isEqualTo(payloads.get(someTargetId));
 
             heartbeatManager.requestHeartbeat(specialTargetId, null);
-            assertThat(specialHeartbeatPayloadFuture.get(), 
is(payloads.get(specialTargetId)));
+            assertThat(specialHeartbeatPayloadFuture.get())
+                    .isEqualTo(payloads.get(specialTargetId));
         } finally {
             heartbeatManager.stop();
         }
@@ -482,9 +473,10 @@ public class HeartbeatManagerTest extends TestLogger {
             someTargetReceivedLatch.await(5, TimeUnit.SECONDS);
             specialTargetReceivedLatch.await(5, TimeUnit.SECONDS);
 
-            assertEquals(defaultResponse, 
someHeartbeatTarget.getLastRequestedHeartbeatPayload());
-            assertEquals(
-                    specialResponse, 
specialHeartbeatTarget.getLastRequestedHeartbeatPayload());
+            assertThat(defaultResponse)
+                    
.isEqualTo(someHeartbeatTarget.getLastRequestedHeartbeatPayload());
+            assertThat(specialResponse)
+                    
.isEqualTo(specialHeartbeatTarget.getLastRequestedHeartbeatPayload());
         } finally {
             heartbeatManager.stop();
             scheduledThreadPoolExecutor.shutdown();
@@ -568,9 +560,8 @@ public class HeartbeatManagerTest extends TestLogger {
             for (int i = 0; i < failedRpcRequestsUntilUnreachable - 1; i++) {
                 heartbeatManager.requestHeartbeat(someTargetId, null);
 
-                assertThat(
-                        unreachableTargetFuture,
-                        FlinkMatchers.willNotComplete(willNotCompleteWithin));
+                FlinkAssertions.assertThatFuture(unreachableTargetFuture)
+                        .willNotCompleteWithin(willNotCompleteWithin);
             }
 
             heartbeatManager.requestHeartbeat(someTargetId, null);
@@ -619,8 +610,8 @@ public class HeartbeatManagerTest extends TestLogger {
                 heartbeatManager.requestHeartbeat(someTargetId, null);
             }
 
-            assertThat(
-                    unreachableTargetFuture, 
FlinkMatchers.willNotComplete(willNotCompleteWithin));
+            FlinkAssertions.assertThatFuture(unreachableTargetFuture)
+                    .willNotCompleteWithin(willNotCompleteWithin);
         } finally {
             heartbeatManager.stop();
         }
@@ -668,8 +659,8 @@ public class HeartbeatManagerTest extends TestLogger {
                 heartbeatManager.requestHeartbeat(someTargetId, null);
             }
 
-            assertThat(
-                    unreachableTargetFuture, 
FlinkMatchers.willNotComplete(willNotCompleteWithin));
+            FlinkAssertions.assertThatFuture(unreachableTargetFuture)
+                    .willNotCompleteWithin(willNotCompleteWithin);
         } finally {
             heartbeatManager.stop();
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java
index a3c25539b8d..5454e5e59db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.testutils.FlinkMatchers;
 import 
org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
@@ -36,7 +35,6 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 
-import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,13 +53,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Tests for {@link DefaultExecutionPlanStore} with {@link 
TestingExecutionPlanStoreWatcher}, {@link
@@ -104,8 +97,8 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
 
         final ExecutionPlan recoveredExecutionPlan =
                 
executionPlanStore.recoverExecutionPlan(testingExecutionPlan.getJobID());
-        assertThat(recoveredExecutionPlan, is(notNullValue()));
-        assertThat(recoveredExecutionPlan.getJobID(), 
is(testingExecutionPlan.getJobID()));
+        assertThat(recoveredExecutionPlan).isNotNull();
+        
assertThat(recoveredExecutionPlan.getJobID()).isEqualTo(testingExecutionPlan.getJobID());
     }
 
     @Test
@@ -123,7 +116,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
 
         final ExecutionPlan recoveredExecutionPlan =
                 
executionPlanStore.recoverExecutionPlan(testingExecutionPlan.getJobID());
-        assertThat(recoveredExecutionPlan, is(nullValue()));
+        assertThat(recoveredExecutionPlan).isNull();
     }
 
     @Test
@@ -141,15 +134,13 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
         final ExecutionPlanStore executionPlanStore =
                 createAndStartExecutionPlanStore(stateHandleStore);
 
-        try {
-            
executionPlanStore.recoverExecutionPlan(testingExecutionPlan.getJobID());
-            fail(
-                    "recoverExecutionPlan should fail when there is exception 
in getting the state handle.");
-        } catch (Exception ex) {
-            assertThat(ex, FlinkMatchers.containsCause(testException));
-            String actual = releaseFuture.get(timeout, TimeUnit.MILLISECONDS);
-            assertThat(actual, is(testingExecutionPlan.getJobID().toString()));
-        }
+        assertThatThrownBy(
+                        () ->
+                                executionPlanStore.recoverExecutionPlan(
+                                        testingExecutionPlan.getJobID()))
+                .hasCause(testException);
+        String actual = releaseFuture.get(timeout, TimeUnit.MILLISECONDS);
+        assertThat(testingExecutionPlan.getJobID()).hasToString(actual);
     }
 
     @Test
@@ -169,7 +160,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
         executionPlanStore.putExecutionPlan(testingExecutionPlan);
 
         final ExecutionPlan actual = addFuture.get(timeout, 
TimeUnit.MILLISECONDS);
-        assertThat(actual.getJobID(), is(testingExecutionPlan.getJobID()));
+        
assertThat(actual.getJobID()).isEqualTo(testingExecutionPlan.getJobID());
     }
 
     @Test
@@ -200,9 +191,9 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
 
         final Tuple3<String, IntegerResourceVersion, ExecutionPlan> actual =
                 replaceFuture.get(timeout, TimeUnit.MILLISECONDS);
-        assertThat(actual.f0, is(testingExecutionPlan.getJobID().toString()));
-        assertThat(actual.f1, 
is(IntegerResourceVersion.valueOf(resourceVersion)));
-        assertThat(actual.f2.getJobID(), is(testingExecutionPlan.getJobID()));
+        
assertThat(actual.f0).isEqualTo(testingExecutionPlan.getJobID().toString());
+        
assertThat(actual.f1).isEqualTo(IntegerResourceVersion.valueOf(resourceVersion));
+        
assertThat(actual.f2.getJobID()).isEqualTo(testingExecutionPlan.getJobID());
     }
 
     @Test
@@ -221,7 +212,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
                 .globalCleanupAsync(testingExecutionPlan.getJobID(), 
Executors.directExecutor())
                 .join();
         final JobID actual = removeFuture.get(timeout, TimeUnit.MILLISECONDS);
-        assertThat(actual, is(testingExecutionPlan.getJobID()));
+        assertThat(actual).isEqualTo(testingExecutionPlan.getJobID());
     }
 
     @Test
@@ -237,7 +228,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
                 .globalCleanupAsync(testingExecutionPlan.getJobID(), 
Executors.directExecutor())
                 .join();
 
-        assertThat(removeFuture.isDone(), is(true));
+        assertThat(removeFuture).isDone();
     }
 
     @Test
@@ -247,13 +238,14 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
 
         final ExecutionPlanStore executionPlanStore =
                 createAndStartExecutionPlanStore(stateHandleStore);
-        assertThrows(
-                ExecutionException.class,
-                () ->
-                        executionPlanStore
-                                .globalCleanupAsync(
-                                        testingExecutionPlan.getJobID(), 
Executors.directExecutor())
-                                .get());
+        assertThatThrownBy(
+                        () ->
+                                executionPlanStore
+                                        .globalCleanupAsync(
+                                                
testingExecutionPlan.getJobID(),
+                                                Executors.directExecutor())
+                                        .get())
+                .isInstanceOf(ExecutionException.class);
     }
 
     @Test
@@ -270,7 +262,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
         final ExecutionPlanStore executionPlanStore =
                 createAndStartExecutionPlanStore(stateHandleStore);
         final Collection<JobID> jobIds = executionPlanStore.getJobIds();
-        assertThat(jobIds, contains(existingJobIds.toArray()));
+        assertThat(jobIds).containsAll(existingJobIds);
     }
 
     @Test
@@ -283,7 +275,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
         executionPlanStore.putExecutionPlan(testingExecutionPlan);
 
         
testingExecutionPlanStoreWatcher.addExecutionPlan(testingExecutionPlan.getJobID());
-        
assertThat(testingExecutionPlanListener.getAddedExecutionPlans().size(), is(0));
+        
assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).isEmpty();
     }
 
     @Test
@@ -303,8 +295,8 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
         // Unknown job
         final JobID unknownJobId = JobID.generate();
         testingExecutionPlanStoreWatcher.addExecutionPlan(unknownJobId);
-        
assertThat(testingExecutionPlanListener.getAddedExecutionPlans().size(), is(1));
-        assertThat(testingExecutionPlanListener.getAddedExecutionPlans(), 
contains(unknownJobId));
+        
assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).hasSize(1);
+        
assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).contains(unknownJobId);
     }
 
     @Test
@@ -320,10 +312,9 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
         testingExecutionPlanStoreWatcher.removeExecutionPlan(JobID.generate());
         // Known job
         
testingExecutionPlanStoreWatcher.removeExecutionPlan(testingExecutionPlan.getJobID());
-        
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans().size(), 
is(1));
-        assertThat(
-                testingExecutionPlanListener.getRemovedExecutionPlans(),
-                contains(testingExecutionPlan.getJobID()));
+        
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans()).hasSize(1);
+        assertThat(testingExecutionPlanListener.getRemovedExecutionPlans())
+                .contains(testingExecutionPlan.getJobID());
     }
 
     @Test
@@ -334,7 +325,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
         createAndStartExecutionPlanStore(stateHandleStore);
 
         
testingExecutionPlanStoreWatcher.removeExecutionPlan(testingExecutionPlan.getJobID());
-        
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans().size(), 
is(0));
+        
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans()).isEmpty();
     }
 
     @Test
@@ -347,7 +338,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
         executionPlanStore.stop();
 
         
testingExecutionPlanStoreWatcher.addExecutionPlan(testingExecutionPlan.getJobID());
-        
assertThat(testingExecutionPlanListener.getAddedExecutionPlans().size(), is(0));
+        
assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).isEmpty();
     }
 
     @Test
@@ -361,7 +352,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
         executionPlanStore.stop();
 
         
testingExecutionPlanStoreWatcher.removeExecutionPlan(testingExecutionPlan.getJobID());
-        
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans().size(), 
is(0));
+        
assertThat(testingExecutionPlanListener.getRemovedExecutionPlans()).isEmpty();
     }
 
     @Test
@@ -374,7 +365,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
                 createAndStartExecutionPlanStore(stateHandleStore);
         executionPlanStore.stop();
 
-        assertThat(completableFuture.isDone(), is(true));
+        assertThat(completableFuture).isDone();
     }
 
     @Test
@@ -390,7 +381,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
                 .join();
 
         final String actual = releaseFuture.get();
-        assertThat(actual, is(testingExecutionPlan.getJobID().toString()));
+        assertThat(testingExecutionPlan.getJobID()).hasToString(actual);
     }
 
     @Test
@@ -450,7 +441,7 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
                 JobResourceRequirements.readFromExecutionPlan(
                         Objects.requireNonNull(
                                 (JobGraph) 
executionPlanStore.recoverExecutionPlan(jobId)));
-        Assertions.assertThat(maybeRecovered).get().isEqualTo(expected);
+        assertThat(maybeRecovered).get().isEqualTo(expected);
     }
 
     @Test
@@ -463,11 +454,11 @@ public class DefaultExecutionPlanStoreTest extends 
TestLogger {
                         .build();
         final ExecutionPlanStore executionPlanStore =
                 createAndStartExecutionPlanStore(stateHandleStore);
-        assertThrows(
-                NoSuchElementException.class,
-                () ->
-                        executionPlanStore.putJobResourceRequirements(
-                                new JobID(), JobResourceRequirements.empty()));
+        assertThatThrownBy(
+                        () ->
+                                executionPlanStore.putJobResourceRequirements(
+                                        new JobID(), 
JobResourceRequirements.empty()))
+                .isInstanceOf(NoSuchElementException.class);
     }
 
     private ExecutionPlanStore createAndStartExecutionPlanStore(
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index dea19ebb948..9746ea9bafb 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -44,13 +44,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
 import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
 import static 
org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
 import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.assertj.core.api.HamcrestCondition.matching;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Tests for {@link CatalogTable} to {@link ResolvedCatalogTable}, {@link 
CatalogMaterializedTable}
@@ -252,41 +250,30 @@ class CatalogBaseTableResolutionTest {
 
     @Test
     void testPropertyDeserializationError() {
-        try {
-            final Map<String, String> properties = catalogTableAsProperties();
-            properties.remove("schema.4.data-type");
-            CatalogTable.fromProperties(properties);
-            fail("unknown failure");
-        } catch (Exception e) {
-            assertThat(e)
-                    .satisfies(
-                            matching(
-                                    containsMessage(
-                                            "Could not find property key 
'schema.4.data-type'.")));
-        }
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> properties = 
catalogTableAsProperties();
+                            properties.remove("schema.4.data-type");
+                            CatalogTable.fromProperties(properties);
+                        })
+                .hasRootCauseMessage("Could not find property key 
'schema.4.data-type'.");
     }
 
     @Test
     void testInvalidPartitionKeys() {
         final CatalogTable catalogTable =
-                CatalogTable.of(
-                        TABLE_SCHEMA,
-                        null,
-                        Arrays.asList("region", "countyINVALID"),
-                        Collections.emptyMap());
-
-        try {
-            resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable);
-            fail("Invalid partition keys expected.");
-        } catch (Exception e) {
-            assertThat(e)
-                    .satisfies(
-                            matching(
-                                    containsMessage(
-                                            "Invalid partition key 
'countyINVALID'. A partition key must "
-                                                    + "reference a physical 
column in the schema. Available "
-                                                    + "columns are: [id, 
region, county]")));
-        }
+                CatalogTable.newBuilder()
+                        .schema(TABLE_SCHEMA)
+                        .comment(null)
+                        .partitionKeys(Arrays.asList("region", 
"countyINVALID"))
+                        .options(Collections.emptyMap())
+                        .build();
+
+        assertThatThrownBy(() -> 
resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable))
+                .hasRootCauseMessage(
+                        "Invalid partition key 'countyINVALID'. A partition 
key must "
+                                + "reference a physical column in the schema. 
Available "
+                                + "columns are: [id, region, county]");
     }
 
     @Test
@@ -317,18 +304,11 @@ class CatalogBaseTableResolutionTest {
                         Collections.emptyMap(),
                         null,
                         
TableDistribution.ofHash(Collections.singletonList("countyINVALID"), 6));
-        try {
-            resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable);
-            fail("Invalid bucket keys expected.");
-        } catch (Exception e) {
-            assertThat(e)
-                    .satisfies(
-                            matching(
-                                    containsMessage(
-                                            "Invalid bucket key 
'countyINVALID'. A bucket key for a distribution must "
-                                                    + "reference a physical 
column in the schema. "
-                                                    + "Available columns are: 
[id, region, county]")));
-        }
+        assertThatThrownBy(() -> 
resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable))
+                .hasRootCauseMessage(
+                        "Invalid bucket key 'countyINVALID'. A bucket key for 
a distribution must "
+                                + "reference a physical column in the schema. "
+                                + "Available columns are: [id, region, 
county]");
     }
 
     @Test
@@ -342,17 +322,10 @@ class CatalogBaseTableResolutionTest {
                         null,
                         
TableDistribution.ofHash(Collections.singletonList("id"), 0));
 
-        try {
-            resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable);
-            fail("Invalid bucket keys expected.");
-        } catch (Exception e) {
-            assertThat(e)
-                    .satisfies(
-                            matching(
-                                    containsMessage(
-                                            "Invalid bucket count '0'. The 
number of buckets for a "
-                                                    + "distributed table must 
be at least 1.")));
-        }
+        assertThatThrownBy(() -> 
resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable))
+                .hasRootCauseMessage(
+                        "Invalid bucket count '0'. The number of buckets for a 
"
+                                + "distributed table must be at least 1.");
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
index 75f6fce9d3b..f69bc791c07 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.expressions.CallExpression;
@@ -47,8 +46,7 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRow
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute;
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.assertj.core.api.HamcrestCondition.matching;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link Schema}, {@link DefaultSchemaResolver}, and {@link 
ResolvedSchema}. */
 class SchemaResolutionTest {
@@ -434,12 +432,8 @@ class SchemaResolutionTest {
     }
 
     private static void testError(Schema schema, String errorMessage, boolean 
isStreaming) {
-        try {
-            resolveSchema(schema, isStreaming);
-            fail("Error message expected: " + errorMessage);
-        } catch (Throwable t) {
-            
assertThat(t).satisfies(matching(FlinkMatchers.containsMessage(errorMessage)));
-        }
+        assertThatThrownBy(() -> resolveSchema(schema, isStreaming))
+                .hasMessageContaining(errorMessage);
     }
 
     private static ResolvedSchema resolveSchema(Schema schema) {
diff --git 
a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala
 
b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala
index 79880f76142..0115ad2285a 100644
--- 
a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala
+++ 
b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala
@@ -22,7 +22,6 @@ import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.types.extraction.DataTypeExtractorTest._
 
 import org.assertj.core.api.Assertions.assertThatThrownBy
-import org.assertj.core.api.HamcrestCondition
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
@@ -37,7 +36,7 @@ class DataTypeExtractorScalaTest {
   def testScalaExtraction(testSpec: DataTypeExtractorTest.TestSpec): Unit = {
     if (testSpec.hasErrorMessage) {
       assertThatThrownBy(() => runExtraction(testSpec))
-        .is(HamcrestCondition.matching(errorMatcher(testSpec)))
+        .hasRootCauseMessage(testSpec.expectedErrorMessage)
         .isInstanceOf[ValidationException]
     } else {
       runExtraction(testSpec)
diff --git 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
index 22b7b6d5c64..f6a389b5a43 100644
--- 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
+++ 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.table.codesplit;
 
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.jupiter.api.Disabled;
@@ -28,7 +27,6 @@ import java.io.File;
 import static org.apache.flink.table.codesplit.CodeSplitTestUtil.trimLines;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.HamcrestCondition.matching;
 
 /** Tests for {@link JavaCodeSplitter}. */
 class JavaCodeSplitterTest {
@@ -46,15 +44,12 @@ class JavaCodeSplitterTest {
     @Test
     @Disabled("Disabled in because of 
https://issues.apache.org/jira/browse/FLINK-27702";)
     void testInvalidJavaCode() {
-        try {
-            JavaCodeSplitter.split("public class InvalidClass { return 1; }", 
4000, 10000);
-        } catch (Exception e) {
-            assertThat(e)
-                    .satisfies(
-                            matching(
-                                    FlinkMatchers.containsMessage(
-                                            "JavaCodeSplitter failed. This is 
a bug. Please file an issue.")));
-        }
+        assertThatThrownBy(
+                        () ->
+                                JavaCodeSplitter.split(
+                                        "public class InvalidClass { return 1; 
}", 4000, 10000))
+                .hasMessageContaining(
+                        "JavaCodeSplitter failed. This is a bug. Please file 
an issue.");
     }
 
     @Test
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
index 21eed2f0277..5c27ae62a3e 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
@@ -44,7 +44,6 @@ import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.utils.DataTypeFactoryMock;
 import org.apache.flink.types.Row;
 
-import org.hamcrest.Matcher;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
@@ -64,7 +63,6 @@ import java.util.function.Function;
 import java.util.stream.Stream;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static org.apache.flink.table.test.TableAssertions.assertThat;
 import static org.apache.flink.table.types.utils.DataTypeFactoryMock.dummyRaw;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -506,7 +504,7 @@ class DataTypeExtractorTest {
 
         private @Nullable DataType expectedDataType;
 
-        private @Nullable String expectedErrorMessage;
+        @Nullable String expectedErrorMessage;
 
         private TestSpec(
                 @Nullable String description, Function<DataTypeFactory, 
DataType> extractor) {
@@ -626,10 +624,6 @@ class DataTypeExtractorTest {
         }
     }
 
-    static Matcher<Throwable> errorMatcher(TestSpec testSpec) {
-        return containsCause(new 
ValidationException(testSpec.expectedErrorMessage));
-    }
-
     /** Testing data type shared with the Scala tests. */
     static DataType getSimplePojoDataType(Class<?> simplePojoClass) {
         final StructuredType.Builder builder = 
StructuredType.newBuilder(simplePojoClass);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
index 49e158a34f4..6f677a34a3e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.codegen;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
@@ -57,7 +56,6 @@ import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
-import static org.assertj.core.api.HamcrestCondition.matching;
 
 /** Tests for code generations with code splitting. */
 class CodeSplitTest {
@@ -256,7 +254,7 @@ class CodeSplitTest {
             consumer.accept(noSplitTableConfig);
             fail("Expecting compiler exception");
         } catch (Exception e) {
-            
assertThat(e).satisfies(matching(FlinkMatchers.containsMessage("grows beyond 64 
KB")));
+            assertThat(e).hasRootCauseMessage("Code grows beyond 64 KB");
         } finally {
             // set stdout back
             System.setOut(originalStdOut);
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java
index 44f030ddedf..f2df69a0989 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java
@@ -34,10 +34,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.assertj.core.api.HamcrestCondition.matching;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Utilities for testing {@link SliceAssigner}s. */
 abstract class SliceAssignerTestBase {
@@ -51,12 +49,7 @@ abstract class SliceAssignerTestBase {
     }
 
     protected static void assertErrorMessage(Runnable runnable, String 
errorMessage) {
-        try {
-            runnable.run();
-            fail("should fail.");
-        } catch (Exception e) {
-            assertThat(e).satisfies(matching(containsMessage(errorMessage)));
-        }
+        assertThatThrownBy(runnable::run).hasMessageContaining(errorMessage);
     }
 
     protected static long assignSliceEnd(SliceAssigner assigner, long 
timestamp) {
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java
index 8a8bb1cdebf..76abcbc43c5 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java
@@ -77,6 +77,23 @@ public class FlinkCompletableFutureAssert<T>
                     (ThrowableAssertAlternative<T>) throwableAssert;
             return cast;
         }
+
+        /**
+         * Checks that the underlying throwable has cause of the given type 
and returns a {@link
+         * ThrowableAssertAlternative} to chain further assertions on the 
underlying throwable.
+         *
+         * @param cause the expected {@link Throwable} cause
+         * @param <T> the expected {@link Throwable} cause
+         * @return a {@link ThrowableAssertAlternative} built with underlying 
throwable.
+         */
+        public <T extends Throwable> ThrowableAssertAlternative<T> 
withCauseOfType(Class<T> cause) {
+            final ThrowableAssertAlternative<Throwable> throwableAssert =
+                    new 
ThrowableAssertAlternative<>(throwable).withCauseInstanceOf(cause);
+            @SuppressWarnings("unchecked")
+            final ThrowableAssertAlternative<T> cast =
+                    (ThrowableAssertAlternative<T>) throwableAssert;
+            return cast;
+        }
     }
 
     FlinkCompletableFutureAssert(CompletableFuture<T> actual) {
@@ -121,8 +138,6 @@ public class FlinkCompletableFutureAssert<T>
     /**
      * Assert that {@link CompletableFuture} will not complete within a fixed 
duration.
      *
-     * <p>This is a replacement for {@link 
FlinkMatchers#willNotComplete(Duration)} in assertj.
-     *
      * @return {@code this} assertion object.
      */
     public FlinkCompletableFutureAssert<T> willNotCompleteWithin(Duration 
duration) {
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
deleted file mode 100644
index 2eaa65ae04a..00000000000
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.testutils;
-
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeDiagnosingMatcher;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.time.Duration;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
-import java.util.function.Predicate;
-
-/**
- * Some reusable hamcrest matchers for Flink.
- *
- * @deprecated You should assertj assertions, which have built-in assertions 
for {@link
- *     CompletableFuture}. To check chains of {@link Throwable} causes, use 
{@link
- *     FlinkAssertions#anyCauseMatches(String)} or {@link 
FlinkAssertions#anyCauseMatches(Class,
- *     String)}
- */
-@Deprecated
-public class FlinkMatchers {
-
-    // ------------------------------------------------------------------------
-    //  factories
-    // ------------------------------------------------------------------------
-
-    /**
-     * Checks whether {@link CompletableFuture} completed already 
exceptionally with a specific
-     * exception type.
-     */
-    public static <T, E extends Throwable> FutureFailedMatcher<T> 
futureFailedWith(
-            Class<E> exceptionType) {
-        Objects.requireNonNull(exceptionType, "exceptionType should not be 
null");
-        return new FutureFailedMatcher<>(exceptionType);
-    }
-
-    /**
-     * Checks whether {@link CompletableFuture} will completed exceptionally 
within a certain time.
-     */
-    public static <T, E extends Throwable> FutureWillFailMatcher<T> 
futureWillCompleteExceptionally(
-            Class<E> exceptionType, Duration timeout) {
-        Objects.requireNonNull(exceptionType, "exceptionType should not be 
null");
-        Objects.requireNonNull(timeout, "timeout should not be null");
-        return new FutureWillFailMatcher<>(exceptionType, timeout);
-    }
-
-    /**
-     * Checks whether {@link CompletableFuture} will completed exceptionally 
within a certain time.
-     */
-    public static <T> FutureWillFailMatcher<T> futureWillCompleteExceptionally(
-            Function<Throwable, Boolean> exceptionCheck,
-            Duration timeout,
-            String checkDescription) {
-        Objects.requireNonNull(exceptionCheck, "exceptionType should not be 
null");
-        Objects.requireNonNull(timeout, "timeout should not be null");
-        return new FutureWillFailMatcher<>(exceptionCheck, timeout, 
checkDescription);
-    }
-
-    /**
-     * Checks whether {@link CompletableFuture} will completed exceptionally 
within a certain time.
-     */
-    public static <T> FutureWillFailMatcher<T> 
futureWillCompleteExceptionally(Duration timeout) {
-        return futureWillCompleteExceptionally(Throwable.class, timeout);
-    }
-
-    /** Checks for a {@link Throwable} that matches by class. */
-    public static Matcher<Throwable> containsCause(Class<? extends Throwable> 
failureCause) {
-        return new ContainsCauseMatcher(failureCause);
-    }
-
-    /** Checks for a {@link Throwable} that matches by class and message. */
-    public static Matcher<Throwable> containsCause(Throwable failureCause) {
-        return new ContainsCauseAndMessageMatcher(failureCause);
-    }
-
-    /** Checks for a {@link Throwable} that contains the expected error 
message. */
-    public static Matcher<Throwable> containsMessage(String errorMessage) {
-        return new ContainsMessageMatcher(errorMessage);
-    }
-
-    /** Checks that a {@link CompletableFuture} won't complete within the 
given timeout. */
-    public static Matcher<CompletableFuture<?>> willNotComplete(Duration 
timeout) {
-        return new WillNotCompleteMatcher(timeout);
-    }
-
-    // ------------------------------------------------------------------------
-
-    /** This class should not be instantiated. */
-    private FlinkMatchers() {}
-
-    // ------------------------------------------------------------------------
-    //  matcher implementations
-    // ------------------------------------------------------------------------
-
-    private static final class FutureFailedMatcher<T>
-            extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> {
-
-        private final Class<? extends Throwable> expectedException;
-
-        FutureFailedMatcher(Class<? extends Throwable> expectedException) {
-            super(CompletableFuture.class);
-            this.expectedException = expectedException;
-        }
-
-        @Override
-        protected boolean matchesSafely(
-                CompletableFuture<T> future, Description mismatchDescription) {
-            if (!future.isDone()) {
-                mismatchDescription.appendText("Future is not completed.");
-                return false;
-            }
-
-            if (!future.isCompletedExceptionally()) {
-                Object result = future.getNow(null);
-                assert result != null;
-                mismatchDescription.appendText(
-                        "Future did not complete exceptionally, but instead 
regularly with: "
-                                + result);
-                return false;
-            }
-
-            try {
-                future.getNow(null);
-                throw new Error();
-            } catch (CompletionException e) {
-                if (e.getCause() != null
-                        && 
expectedException.isAssignableFrom(e.getCause().getClass())) {
-                    return true;
-                }
-
-                mismatchDescription.appendText(
-                        "Future completed with different exception: " + 
e.getCause());
-                return false;
-            }
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description.appendText(
-                    "A CompletableFuture that failed with: " + 
expectedException.getName());
-        }
-    }
-
-    private static final class FutureWillFailMatcher<T>
-            extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> {
-
-        private final Function<Throwable, Boolean> exceptionValidator;
-
-        private final Duration timeout;
-
-        private final String validationDescription;
-
-        FutureWillFailMatcher(Class<? extends Throwable> expectedException, 
Duration timeout) {
-
-            super(CompletableFuture.class);
-            this.exceptionValidator = (e) -> 
expectedException.isAssignableFrom(e.getClass());
-            this.timeout = timeout;
-            this.validationDescription = expectedException.getName();
-        }
-
-        FutureWillFailMatcher(
-                Function<Throwable, Boolean> exceptionValidator,
-                Duration timeout,
-                String validationDescription) {
-
-            super(CompletableFuture.class);
-            this.exceptionValidator = exceptionValidator;
-            this.timeout = timeout;
-            this.validationDescription = validationDescription;
-        }
-
-        @Override
-        protected boolean matchesSafely(
-                CompletableFuture<T> future, Description mismatchDescription) {
-            try {
-                final Object result = future.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
-                mismatchDescription.appendText(
-                        "Future did not complete exceptionally, but instead 
regularly with: "
-                                + result);
-                return false;
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new Error("interrupted test");
-            } catch (TimeoutException e) {
-                mismatchDescription.appendText(
-                        "Future did not complete withing " + 
timeout.toMillis() + " milliseconds.");
-                return false;
-            } catch (ExecutionException e) {
-                final Throwable cause = e.getCause();
-                if (cause != null && exceptionValidator.apply(cause)) {
-                    return true;
-                }
-
-                String otherDescription = "(null)";
-                if (cause != null) {
-                    final StringWriter stm = new StringWriter();
-                    try (PrintWriter wrt = new PrintWriter(stm)) {
-                        cause.printStackTrace(wrt);
-                    }
-                    otherDescription = stm.toString();
-                }
-
-                mismatchDescription.appendText(
-                        "Future completed with different exception: " + 
otherDescription);
-                return false;
-            }
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description.appendText(
-                    "A CompletableFuture that will have failed within "
-                            + timeout.toMillis()
-                            + " milliseconds with: "
-                            + validationDescription);
-        }
-    }
-
-    private static final class ContainsCauseMatcher extends 
TypeSafeDiagnosingMatcher<Throwable> {
-
-        private final Class<? extends Throwable> failureCause;
-
-        private ContainsCauseMatcher(Class<? extends Throwable> failureCause) {
-            this.failureCause = failureCause;
-        }
-
-        @Override
-        protected boolean matchesSafely(Throwable throwable, Description 
description) {
-            final Optional<Throwable> optionalCause =
-                    findThrowable(throwable, cause -> cause.getClass() == 
failureCause);
-
-            if (!optionalCause.isPresent()) {
-                description
-                        .appendText("The throwable ")
-                        .appendValue(throwable)
-                        .appendText(" does not contain the expected failure 
cause ")
-                        .appendValue(failureCause.getSimpleName());
-            }
-
-            return optionalCause.isPresent();
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description
-                    .appendText("Expected failure cause is ")
-                    .appendValue(failureCause.getSimpleName());
-        }
-    }
-
-    private static final class ContainsCauseAndMessageMatcher
-            extends TypeSafeDiagnosingMatcher<Throwable> {
-
-        private final Throwable failureCause;
-
-        private ContainsCauseAndMessageMatcher(Throwable failureCause) {
-            this.failureCause = failureCause;
-        }
-
-        @Override
-        protected boolean matchesSafely(Throwable throwable, Description 
description) {
-            final Optional<Throwable> optionalCause =
-                    findThrowable(
-                            throwable,
-                            cause ->
-                                    cause.getClass() == failureCause.getClass()
-                                            && cause.getMessage()
-                                                    
.equals(failureCause.getMessage()));
-
-            if (!optionalCause.isPresent()) {
-                description
-                        .appendText("The throwable ")
-                        .appendValue(throwable)
-                        .appendText(" does not contain the expected failure 
cause ")
-                        .appendValue(failureCause);
-            }
-
-            return optionalCause.isPresent();
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description.appendText("Expected failure cause is 
").appendValue(failureCause);
-        }
-    }
-
-    private static final class ContainsMessageMatcher extends 
TypeSafeDiagnosingMatcher<Throwable> {
-
-        private final String errorMessage;
-
-        private ContainsMessageMatcher(String errorMessage) {
-            this.errorMessage = errorMessage;
-        }
-
-        @Override
-        protected boolean matchesSafely(Throwable throwable, Description 
description) {
-            final Optional<Throwable> optionalCause =
-                    findThrowable(throwable, this::containsErrorMessage);
-
-            if (!optionalCause.isPresent()) {
-                description
-                        .appendText("The throwable ")
-                        .appendValue(throwable)
-                        .appendText(" does not contain the expected error 
message ")
-                        .appendValue(errorMessage);
-            }
-
-            return optionalCause.isPresent();
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description.appendText("Expected error message is 
").appendValue(errorMessage);
-        }
-
-        private boolean containsErrorMessage(Throwable t) {
-            return t.getMessage() != null && 
t.getMessage().contains(errorMessage);
-        }
-    }
-
-    // copied from flink-core to not mess up the dependency design too much, 
just for a little
-    // utility method
-    private static Optional<Throwable> findThrowable(
-            Throwable throwable, Predicate<Throwable> predicate) {
-        if (throwable == null || predicate == null) {
-            return Optional.empty();
-        }
-
-        Throwable t = throwable;
-        while (t != null) {
-            if (predicate.test(t)) {
-                return Optional.of(t);
-            } else {
-                t = t.getCause();
-            }
-        }
-
-        return Optional.empty();
-    }
-
-    private static final class WillNotCompleteMatcher
-            extends TypeSafeDiagnosingMatcher<CompletableFuture<?>> {
-
-        private final Duration timeout;
-
-        private WillNotCompleteMatcher(Duration timeout) {
-            this.timeout = timeout;
-        }
-
-        @Override
-        protected boolean matchesSafely(
-                CompletableFuture<?> item, Description mismatchDescription) {
-
-            try {
-                final Object value = item.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
-                mismatchDescription
-                        .appendText("The given future completed with ")
-                        .appendValue(value);
-            } catch (TimeoutException timeoutException) {
-                return true;
-            } catch (InterruptedException e) {
-                mismatchDescription.appendText("The waiting thread was 
interrupted.");
-            } catch (ExecutionException e) {
-                mismatchDescription
-                        .appendText("The given future was completed 
exceptionally: ")
-                        .appendValue(e);
-            }
-
-            return false;
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description
-                    .appendText("The given future should not complete within ")
-                    .appendValue(timeout.toMillis())
-                    .appendText(" ms.");
-        }
-    }
-}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java
index 9e5638ccc4e..965d1ebef6b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java
@@ -36,11 +36,11 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 import org.apache.flink.util.TestLogger;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Integration tests for the {@link JobMaster}. */
 public class JobMasterITCase extends TestLogger {
@@ -58,9 +58,9 @@ public class JobMasterITCase extends TestLogger {
 
         try {
             miniCluster.getMiniCluster().submitJob(jobGraph).get();
-            fail("Expect failure");
+            Assertions.fail("Expect failure");
         } catch (Throwable t) {
-            assertThat(t, containsMessage("The given job is empty"));
+            assertThat(t).hasRootCauseMessage("The given job is empty");
         } finally {
             miniCluster.after();
         }
@@ -85,11 +85,7 @@ public class JobMasterITCase extends TestLogger {
                 see.fromSource(mySource, WatermarkStrategy.noWatermarks(), 
"MySourceName");
         stream.sinkTo(new DiscardingSink<>());
 
-        try {
-            see.execute();
-        } catch (Exception e) {
-            assertThat(e, containsMessage("Context was not yet initialized"));
-        }
+        assertThatThrownBy(see::execute).hasRootCauseMessage("Context was not 
yet initialized");
     }
 
     private static class FailOnInitializationSource implements Source<String, 
MockSplit, Void> {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
index f15cb467032..e2136169728 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
@@ -58,12 +58,12 @@ import 
org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
 import 
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
-import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -84,15 +84,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.util.ExceptionUtils.assertThrowable;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.either;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** Integration tests for the adaptive scheduler. */
 public class AdaptiveSchedulerITCase extends TestLogger {
@@ -124,7 +118,7 @@ public class AdaptiveSchedulerITCase extends TestLogger {
 
     @Before
     public void ensureAdaptiveSchedulerEnabled() {
-        assumeTrue(ClusterOptions.isAdaptiveSchedulerEnabled(configuration));
+        
assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
     }
 
     @After
@@ -172,8 +166,8 @@ public class AdaptiveSchedulerITCase extends TestLogger {
                                 savepointDirectory.getAbsolutePath(),
                                 SavepointFormatType.CANONICAL)
                         .get();
-        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
-        assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+        assertThat(savepoint).contains(savepointDirectory.getAbsolutePath());
+        assertThat(client.getJobStatus().get()).isSameAs(JobStatus.FINISHED);
     }
 
     @Test
@@ -187,16 +181,14 @@ public class AdaptiveSchedulerITCase extends TestLogger {
         JobClient client = env.executeAsync();
 
         DummySource.awaitRunning();
-        try {
-            client.stopWithSavepoint(
-                            false,
-                            
tempFolder.newFolder("savepoint").getAbsolutePath(),
-                            SavepointFormatType.CANONICAL)
-                    .get();
-            fail("Expect exception");
-        } catch (ExecutionException e) {
-            assertThat(e, containsCause(FlinkException.class));
-        }
+        assertThatThrownBy(
+                        () ->
+                                client.stopWithSavepoint(
+                                                false,
+                                                
tempFolder.newFolder("savepoint").getAbsolutePath(),
+                                                SavepointFormatType.CANONICAL)
+                                        .get())
+                .hasCauseInstanceOf(FlinkException.class);
         // expect job to run again (maybe restart)
         CommonTestUtils.waitUntilCondition(() -> client.getJobStatus().get() 
== JobStatus.RUNNING);
     }
@@ -217,18 +209,18 @@ public class AdaptiveSchedulerITCase extends TestLogger {
                         false,
                         tempFolder.newFolder("savepoint").getAbsolutePath(),
                         SavepointFormatType.CANONICAL);
-        final Throwable savepointException =
-                assertThrows(ExecutionException.class, 
savepointCompleted::get).getCause();
-        assertThrowable(
-                savepointException,
-                throwable ->
-                        throwable instanceof StopWithSavepointStoppingException
-                                && throwable
-                                        .getMessage()
-                                        .startsWith("A savepoint has been 
created at: "));
-        assertThat(
-                client.getJobStatus().get(),
-                either(is(JobStatus.FAILED)).or(is(JobStatus.FAILING)));
+        assertThatThrownBy(savepointCompleted::get)
+                .isInstanceOf(ExecutionException.class)
+                .satisfies(
+                        e ->
+                                assertThat(
+                                                ExceptionUtils.findThrowable(
+                                                                e,
+                                                                
StopWithSavepointStoppingException
+                                                                        .class)
+                                                        .get())
+                                        .hasMessageContaining("A savepoint has 
been created at: "));
+        assertThat(client.getJobStatus().get()).isIn(JobStatus.FAILED, 
JobStatus.FAILING);
     }
 
     @Test
@@ -248,16 +240,14 @@ public class AdaptiveSchedulerITCase extends TestLogger {
         DummySource.awaitRunning();
         DummySource.resetForParallelism(PARALLELISM);
         final File savepointDirectory = tempFolder.newFolder("savepoint");
-        try {
-            client.stopWithSavepoint(
-                            false,
-                            savepointDirectory.getAbsolutePath(),
-                            SavepointFormatType.CANONICAL)
-                    .get();
-            fail("Expect failure of operation");
-        } catch (ExecutionException e) {
-            assertThat(e, containsCause(FlinkException.class));
-        }
+        assertThatThrownBy(
+                        () ->
+                                client.stopWithSavepoint(
+                                                false,
+                                                
savepointDirectory.getAbsolutePath(),
+                                                SavepointFormatType.CANONICAL)
+                                        .get())
+                .hasCauseInstanceOf(FlinkException.class);
 
         DummySource.awaitRunning();
 
@@ -273,7 +263,7 @@ public class AdaptiveSchedulerITCase extends TestLogger {
                                 savepointDirectory.getAbsolutePath(),
                                 SavepointFormatType.CANONICAL)
                         .get();
-        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+        assertThat(savepoint).contains(savepointDirectory.getAbsolutePath());
     }
 
     @Test
@@ -335,22 +325,20 @@ public class AdaptiveSchedulerITCase extends TestLogger {
 
         // there should be exactly 1 root exception in the history from the 
failing vertex,
         // as the global coordinator failure should be treated as a concurrent 
exception
-        Assertions.assertThat(jobExceptions.getExceptionHistory().getEntries())
+        assertThat(jobExceptions.getExceptionHistory().getEntries())
                 .hasSize(1)
                 .allSatisfy(
                         rootExceptionInfo ->
-                                
Assertions.assertThat(rootExceptionInfo.getStacktrace())
+                                assertThat(rootExceptionInfo.getStacktrace())
                                         
.contains(FailingInvokable.localExceptionMsg)
                                         .doesNotContain(
                                                 
FailingCoordinatorProvider.globalExceptionMsg))
                 .allSatisfy(
                         rootExceptionInfo ->
-                                
Assertions.assertThat(rootExceptionInfo.getConcurrentExceptions())
+                                
assertThat(rootExceptionInfo.getConcurrentExceptions())
                                         .anySatisfy(
                                                 exceptionInfo ->
-                                                        Assertions.assertThat(
-                                                                        
exceptionInfo
-                                                                               
 .getStacktrace())
+                                                        
assertThat(exceptionInfo.getStacktrace())
                                                                 .contains(
                                                                         
FailingCoordinatorProvider
                                                                                
 .globalExceptionMsg)));

Reply via email to