1996fanrui commented on code in PR #23204:
URL: https://github.com/apache/flink/pull/23204#discussion_r1295487947
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest.java:
##########
@@ -233,18 +237,16 @@ public void
canReconnectToOldLeaderWithSameLeaderAddress() throws Exception {
leaderRetrievalService.notifyListener(
jobMasterGateway.getAddress(),
jobMasterGateway.getFencingToken().toUUID());
- try {
- newLeaderFuture.get(10, TimeUnit.MILLISECONDS);
- fail("The leader future should not be completed.");
- } catch (TimeoutException expected) {
- }
+ assertThatThrownBy(() -> newLeaderFuture.get(10,
TimeUnit.MILLISECONDS))
+ .withFailMessage("The leader future should not be
completed.")
+ .isInstanceOf(TimeoutException.class);
Review Comment:
```suggestion
FlinkAssertions.assertThatFuture(newLeaderFuture).willNotCompleteWithin(Duration.ofMillis(10));
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java:
##########
@@ -124,7 +132,7 @@ private static CompletableFuture<JobResult>
submitJobAndWaitUntilRunning(
final CompletableFuture<JobResult> jobResultFuture =
miniCluster.requestJobResult(jobGraph.getJobID());
- assertThat(jobResultFuture.isDone(), is(false));
+ assertThat(jobResultFuture.isDone()).isFalse();
Review Comment:
```suggestion
assertThat(jobResultFuture).isNotDone();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java:
##########
@@ -192,7 +190,7 @@ public void testDeployedExecutionReporting() throws
Exception {
// nothing as deployed, so the deployment report should be empty
taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId,
slotAllocationReport);
- assertThat(deployedExecutionsQueue.take(), hasSize(0));
+ assertThat(deployedExecutionsQueue.take()).hasSize(0);
Review Comment:
```suggestion
assertThat(deployedExecutionsQueue.take()).isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest.java:
##########
@@ -278,7 +280,7 @@ public void
rejectedJobManagerRegistrationCallsJobLeaderListener() throws Except
leaderRetrievalService.notifyListener(
jobMasterGateway.getAddress(),
jobMasterGateway.getFencingToken().toUUID());
- assertThat(rejectedRegistrationFuture.get(), is(jobId));
+ assertThat(rejectedRegistrationFuture.get()).isEqualTo(jobId);
Review Comment:
```suggestion
assertThat(rejectedRegistrationFuture).isCompletedWithValue(jobId);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java:
##########
@@ -49,11 +47,11 @@ public void
testTaskSlotClosedOnlyWhenAddedTasksTerminated() throws Exception {
task.waitForFailure();
MemoryManager memoryManager = taskSlot.getMemoryManager();
- assertThat(closingFuture.isDone(), is(false));
- assertThat(memoryManager.isShutdown(), is(false));
+ assertThat(closingFuture.isDone()).isFalse();
Review Comment:
```suggestion
assertThat(closingFuture).isNotDone();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/DefaultTimerServiceTest.java:
##########
@@ -50,22 +47,22 @@ public void testUnregisterAllTimeouts() throws Exception {
timerService.unregisterAllTimeouts();
Map<?, ?> timeouts = timerService.getTimeouts();
- assertTrue(timeouts.isEmpty());
+ assertThat(timeouts.isEmpty()).isTrue();
Review Comment:
```suggestion
assertThat(timeouts).isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -222,14 +207,15 @@ public void
testWorkingDirIsSetupWhenStartingTaskManagerRunner() throws Exceptio
final TaskManagerRunner taskManagerRunner =
createTaskManagerRunner(configuration);
try {
- assertTrue(workingDir.exists());
+ assertThat(workingDir.exists()).isTrue();
} finally {
taskManagerRunner.close();
}
- assertFalse(
- "The working dir should be cleaned up when stopping the
TaskManager process gracefully.",
- workingDir.exists());
+ assertThat(workingDir.exists())
+ .withFailMessage(
+ "The working dir should be cleaned up when stopping
the TaskManager process gracefully.")
+ .isFalse();
Review Comment:
```suggestion
assertThat(workingDir)
.withFailMessage(
"The working dir should be cleaned up when stopping
the TaskManager process gracefully.")
.doesNotExist();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -222,14 +207,15 @@ public void
testWorkingDirIsSetupWhenStartingTaskManagerRunner() throws Exceptio
final TaskManagerRunner taskManagerRunner =
createTaskManagerRunner(configuration);
try {
- assertTrue(workingDir.exists());
+ assertThat(workingDir.exists()).isTrue();
Review Comment:
```suggestion
assertThat(workingDir).exists();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -36,139 +36,127 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
-import org.junit.After;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nonnull;
import java.io.File;
import java.net.InetAddress;
+import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link TaskManagerRunner}. */
-public class TaskManagerRunnerTest extends TestLogger {
+@Timeout(30)
+class TaskManagerRunnerTest {
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
-
- @Rule public final Timeout timeout = Timeout.seconds(30);
+ @TempDir private Path temporaryFolder;
private TaskManagerRunner taskManagerRunner;
- @After
- public void after() throws Exception {
+ @AfterEach
+ void after() throws Exception {
if (taskManagerRunner != null) {
taskManagerRunner.close();
}
}
@Test
- public void testShouldShutdownOnFatalError() throws Exception {
+ void testShouldShutdownOnFatalError() throws Exception {
Configuration configuration = createConfiguration();
// very high timeout, to ensure that we don't fail because of
registration timeouts
configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT,
TimeUtils.parseDuration("42 h"));
taskManagerRunner = createTaskManagerRunner(configuration);
taskManagerRunner.onFatalError(new RuntimeException());
- assertThat(
- taskManagerRunner.getTerminationFuture().join(),
- is(equalTo(TaskManagerRunner.Result.FAILURE)));
+ assertThat(taskManagerRunner.getTerminationFuture().join())
+ .isEqualTo(TaskManagerRunner.Result.FAILURE);
}
@Test
- public void testShouldShutdownIfRegistrationWithJobManagerFails() throws
Exception {
+ void testShouldShutdownIfRegistrationWithJobManagerFails() throws
Exception {
Configuration configuration = createConfiguration();
configuration.set(
TaskManagerOptions.REGISTRATION_TIMEOUT,
TimeUtils.parseDuration("10 ms"));
taskManagerRunner = createTaskManagerRunner(configuration);
- assertThat(
- taskManagerRunner.getTerminationFuture().join(),
- is(equalTo(TaskManagerRunner.Result.FAILURE)));
+ assertThat(taskManagerRunner.getTerminationFuture().join())
+ .isEqualTo(TaskManagerRunner.Result.FAILURE);
Review Comment:
```suggestion
assertThat(taskManagerRunner.getTerminationFuture()).isCompletedWithValue(TaskManagerRunner.Result.FAILURE);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -36,139 +36,127 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
-import org.junit.After;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nonnull;
import java.io.File;
import java.net.InetAddress;
+import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link TaskManagerRunner}. */
-public class TaskManagerRunnerTest extends TestLogger {
+@Timeout(30)
Review Comment:
Why add the `Timeout` here?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java:
##########
@@ -220,7 +216,7 @@ private void
testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion(
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
trackerIsTrackingPartitions.set(true);
- assertThat(firstReleasePartitionsCallFuture.isDone(), is(false));
+ assertThat(firstReleasePartitionsCallFuture.isDone()).isFalse();
Review Comment:
```suggestion
assertThat(firstReleasePartitionsCallFuture).isNotDone();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2125,7 +2099,7 @@ public void testOfferSlotToJobMasterAfterTimeout() throws
Exception {
new TestingJobMasterGatewayBuilder()
.setOfferSlotsFunction(
(resourceID, slotOffers) -> {
- assertThat(slotOffers.size(), is(1));
+ assertThat(slotOffers.size()).isOne();
Review Comment:
```suggestion
assertThat(slotOffers).hasSize(1);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java:
##########
@@ -38,42 +38,50 @@
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.function.Supplier;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/** Integration tests for the {@link TaskExecutor}. */
-public class TaskExecutorITCase extends TestLogger {
+class TaskExecutorITCase {
private static final int NUM_TMS = 2;
private static final int SLOTS_PER_TM = 2;
private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
- @Rule
- public final MiniClusterResource miniClusterResource =
+ private final MiniClusterResource miniClusterResource =
Review Comment:
Using the `MiniClusterExtension` instead, and the `@BeforeEach` and
`@AfterEach` can be removed.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java:
##########
@@ -60,31 +59,31 @@
import static
org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
Review Comment:
The `assertThat` is wrong package.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2018,21 +1993,20 @@ public void
testRegisterWithDefaultSlotResourceProfile() throws Exception {
testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway.getFencingToken().toUUID());
- assertThat(
- registeredDefaultSlotResourceProfileFuture.get(),
- equalTo(
+ assertThat(registeredDefaultSlotResourceProfileFuture.get())
+ .isEqualTo(
Review Comment:
```suggestion
assertThat(registeredDefaultSlotResourceProfileFuture).isCompletedWithValue(
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java:
##########
@@ -203,16 +201,15 @@ public void testDeployedExecutionReporting() throws
Exception {
// task is deployed, so the deployment report should contain it
taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId,
slotAllocationReport);
- assertThat(
- deployedExecutionsQueue.take(),
- hasItem(taskDeploymentDescriptor.getExecutionAttemptId()));
+ assertThat(deployedExecutionsQueue.take())
+
.contains(taskDeploymentDescriptor.getExecutionAttemptId());
TestingInvokable.sync.releaseBlocker();
// task is finished ans was cleaned up, so the deployment report
should be empty
taskFinishedFuture.get();
taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId,
slotAllocationReport);
- assertThat(deployedExecutionsQueue.take(), hasSize(0));
+ assertThat(deployedExecutionsQueue.take()).hasSize(0);
Review Comment:
```suggestion
assertThat(deployedExecutionsQueue.take()).isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java:
##########
@@ -231,7 +227,7 @@ private void
testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion(
firstReleasePartitionsCallFuture.get();
// connection should be kept alive since the table still contains
partitions
- assertThat(disconnectFuture.isDone(), is(false));
+ assertThat(disconnectFuture.isDone()).isFalse();
Review Comment:
```suggestion
assertThat(disconnectFuture).isNotDone()();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java:
##########
@@ -255,12 +251,12 @@ public void
testPartitionReleaseAfterJobMasterDisconnect() throws Exception {
(jobId, resultPartitionDeploymentDescriptor, taskExecutor,
taskExecutorGateway) -> {
taskExecutorGateway.disconnectJobManager(jobId, new
Exception("test"));
- assertThat(releasePartitionsForJobFuture.get(),
equalTo(jobId));
+
assertThat(releasePartitionsForJobFuture.get()).isEqualTo(jobId);
Review Comment:
```suggestion
assertThat(releasePartitionsForJobFuture).isCompletedWithValue(jobId);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java:
##########
@@ -146,20 +148,20 @@ public void testSubmitTaskFailure() throws Exception {
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
- .build(EXECUTOR_RESOURCE.getExecutor())) {
+ .build(EXECUTOR_EXTENSION.getExecutor())) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(),
Time.seconds(60));
- tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
- } catch (Exception e) {
- assertThat(e.getCause(),
instanceOf(IllegalArgumentException.class));
+
+ assertThatThrownBy(() -> tmGateway.submitTask(tdd,
env.getJobMasterId(), timeout).get())
+ .hasCauseInstanceOf(IllegalArgumentException.class);
Review Comment:
```suggestion
assertThatFuture(mGateway.submitTask(tdd, env.getJobMasterId(),
timeout))
.eventuallyFailsWith(ExecutionException.class)
.withCauseInstanceOf(IllegalArgumentException.class);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2234,19 +2208,20 @@ public void testDisconnectFromJobMasterWhenNewLeader()
throws Exception {
jobManagerLeaderRetriever.notifyListener(
jobMasterGateway.getAddress(), UUID.randomUUID());
- assertThat(offeredSlotsFuture.get(), is(1));
+ assertThat(offeredSlotsFuture.get()).isOne();
// notify loss of leadership
jobManagerLeaderRetriever.notifyListener(null, null);
- assertThat(disconnectFuture.get(), is(resourceID));
+ assertThat(disconnectFuture.get()).isEqualTo(resourceID);
Review Comment:
```suggestion
assertThat(disconnectFuture).isCompletedWithValue(resourceID);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java:
##########
@@ -544,12 +540,11 @@ private void internalTestPartitionRelease(
TestingInvokable.sync.awaitBlocker();
// the task is still running => the partition is in in-progress
and should be tracked
- assertThat(
- startTrackingFuture.get(),
- equalTo(
+ assertThat(startTrackingFuture.get())
+ .isEqualTo(
Review Comment:
```suggestion
.isCompletedWithValue(
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -3086,13 +3058,13 @@ public void testSharedResourcesLifecycle() throws
Exception {
ctx.taskExecutor.cancelTask(executions.get(i), timeout).get();
waitForTasks(ctx, numTasks -> numTasks > numRemaining);
if (numRemaining > 0) {
- assertEquals(0,
SharedResourceCollectingInvokable.timesDeallocated.get());
+
assertThat(SharedResourceCollectingInvokable.timesDeallocated.get()).isZero();
}
}
}
// verify
- assertEquals(1,
SharedResourceCollectingInvokable.timesAllocated.get());
- assertEquals(1,
SharedResourceCollectingInvokable.timesDeallocated.get());
+
assertThat(SharedResourceCollectingInvokable.timesAllocated.get()).isOne();
+
assertThat(SharedResourceCollectingInvokable.timesDeallocated.get()).isOne();
Review Comment:
```suggestion
assertThat(SharedResourceCollectingInvokable.timesAllocated).hasValue(1);
assertThat(SharedResourceCollectingInvokable.timesDeallocated).hasValue(1);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -204,14 +190,13 @@ public void
testUnexpectedTaskManagerTerminationAfterRunnerCloseWillBeIgnored()
terminationFuture.complete(null);
- assertThat(
- taskManagerRunner.getTerminationFuture().join(),
- is(equalTo(TaskManagerRunner.Result.SUCCESS)));
+ assertThat(taskManagerRunner.getTerminationFuture().join())
+ .isEqualTo(TaskManagerRunner.Result.SUCCESS);
Review Comment:
```suggestion
assertThat(taskManagerRunner.getTerminationFuture()).isCompletedWithValue(TaskManagerRunner.Result.SUCCESS);
```
I guess these `CompletableFuture.join()` can be replaced by
`isCompletedWithValue`, please correct my if i'm wrong, thanks!
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -258,10 +244,11 @@ public void testWorkingDirIsNotDeletedInCaseOfFailure()
throws Exception {
taskManagerRunner.getTerminationFuture().join();
- assertTrue(
- ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile(
- configuration, resourceId)
- .exists());
+ assertThat(
+
ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile(
+ configuration, resourceId)
+ .exists())
+ .isTrue();
Review Comment:
```suggestion
assertThat(
ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile(
configuration, resourceId)).exists();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2257,34 +2232,31 @@ public void testLogNotFoundHandling() throws Throwable {
new Builder(jobId)
.setConfiguration(configuration)
.setLocalCommunication(false)
- .build(EXECUTOR_RESOURCE.getExecutor())) {
+ .build(EXECUTOR_EXTENSION.getExecutor())) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- try {
- CompletableFuture<TransientBlobKey> logFuture =
- tmGateway.requestFileUploadByType(FileType.LOG,
timeout);
- logFuture.get();
- } catch (Exception e) {
- assertThat(
- e.getMessage(),
- containsString("The file LOG does not exist on the
TaskExecutor."));
- }
+ CompletableFuture<TransientBlobKey> logFuture =
+ tmGateway.requestFileUploadByType(FileType.LOG, timeout);
+ assertThatThrownBy(logFuture::get)
+ .hasMessageContaining("The file LOG does not exist on the
TaskExecutor.")
+ .isInstanceOf(ExecutionException.class);
Review Comment:
```suggestion
FlinkAssertions
.assertThatFuture(logFuture).eventuallyFails().withThrowableOfType(ExecutionException.class).withMessageContaining("The
file LOG does not exist on the TaskExecutor.").
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java:
##########
@@ -267,118 +258,110 @@ public void
testSlotAllocationWithConcreteResourceProfile() throws Exception {
ResourceProfile.newBuilder().setCpuCores(0.1).build());
assertThat(
- taskSlotTable.allocateSlot(
- -1, jobId, allocationId, resourceProfile,
SLOT_TIMEOUT),
- is(true));
+ taskSlotTable.allocateSlot(
+ -1, jobId, allocationId, resourceProfile,
SLOT_TIMEOUT))
+ .isTrue();
Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
taskSlotTable.getAllocatedSlots(jobId);
TaskSlot<TaskSlotPayload> allocatedSlot = allocatedSlots.next();
- assertThat(allocatedSlot.getIndex(), is(2));
- assertThat(allocatedSlot.getResourceProfile(),
is(resourceProfile));
- assertThat(allocatedSlots.hasNext(), is(false));
+ assertThat(allocatedSlot.getIndex()).isEqualTo(2);
+
assertThat(allocatedSlot.getResourceProfile()).isEqualTo(resourceProfile);
+ assertThat(allocatedSlots.hasNext()).isFalse();
}
}
@Test
- public void testSlotAllocationWithUnknownResourceProfile() throws
Exception {
+ void testSlotAllocationWithUnknownResourceProfile() throws Exception {
try (final TaskSlotTableImpl<TaskSlotPayload> taskSlotTable =
createTaskSlotTableAndStart(2)) {
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
assertThat(
- taskSlotTable.allocateSlot(
- -1, jobId, allocationId, ResourceProfile.UNKNOWN,
SLOT_TIMEOUT),
- is(true));
+ taskSlotTable.allocateSlot(
+ -1, jobId, allocationId,
ResourceProfile.UNKNOWN, SLOT_TIMEOUT))
+ .isTrue();
Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
taskSlotTable.getAllocatedSlots(jobId);
TaskSlot<TaskSlotPayload> allocatedSlot = allocatedSlots.next();
- assertThat(allocatedSlot.getIndex(), is(2));
- assertThat(
- allocatedSlot.getResourceProfile(),
is(TaskSlotUtils.DEFAULT_RESOURCE_PROFILE));
- assertThat(allocatedSlots.hasNext(), is(false));
+ assertThat(allocatedSlot.getIndex()).isEqualTo(2);
+ assertThat(allocatedSlot.getResourceProfile())
+ .isEqualTo(TaskSlotUtils.DEFAULT_RESOURCE_PROFILE);
+ assertThat(allocatedSlots.hasNext()).isFalse();
}
}
@Test
- public void testSlotAllocationWithResourceProfileFailure() throws
Exception {
+ void testSlotAllocationWithResourceProfileFailure() throws Exception {
try (final TaskSlotTable<TaskSlotPayload> taskSlotTable =
createTaskSlotTableAndStart(2)) {
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
ResourceProfile resourceProfile =
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE;
resourceProfile =
resourceProfile.merge(resourceProfile).merge(resourceProfile);
assertThat(
- taskSlotTable.allocateSlot(
- -1, jobId, allocationId, resourceProfile,
SLOT_TIMEOUT),
- is(false));
+ taskSlotTable.allocateSlot(
+ -1, jobId, allocationId, resourceProfile,
SLOT_TIMEOUT))
+ .isFalse();
Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
taskSlotTable.getAllocatedSlots(jobId);
- assertThat(allocatedSlots.hasNext(), is(false));
+ assertThat(allocatedSlots.hasNext()).isFalse();
}
}
@Test
- public void testGenerateSlotReport() throws Exception {
+ void testGenerateSlotReport() throws Exception {
try (final TaskSlotTable<TaskSlotPayload> taskSlotTable =
createTaskSlotTableAndStart(3)) {
final JobID jobId = new JobID();
final AllocationID allocationId1 = new AllocationID();
final AllocationID allocationId2 = new AllocationID();
final AllocationID allocationId3 = new AllocationID();
- assertThat(
- taskSlotTable.allocateSlot(0, jobId, allocationId1,
SLOT_TIMEOUT),
- is(true)); // index 0
- assertThat(
- taskSlotTable.allocateSlot(-1, jobId, allocationId2,
SLOT_TIMEOUT),
- is(true)); // index 3
- assertThat(
- taskSlotTable.allocateSlot(-1, jobId, allocationId3,
SLOT_TIMEOUT),
- is(true)); // index 4
+ assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId1,
SLOT_TIMEOUT))
+ .isTrue(); // index 0
+ assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId2,
SLOT_TIMEOUT))
+ .isTrue(); // index 3
+ assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId3,
SLOT_TIMEOUT))
+ .isTrue(); // index 4
- assertThat(taskSlotTable.freeSlot(allocationId2), is(3));
+ assertThat(taskSlotTable.freeSlot(allocationId2)).isEqualTo(3);
ResourceID resourceId = ResourceID.generate();
SlotReport slotReport = taskSlotTable.createSlotReport(resourceId);
List<SlotStatus> slotStatuses = new ArrayList<>();
slotReport.iterator().forEachRemaining(slotStatuses::add);
- assertThat(slotStatuses.size(), is(4));
- assertThat(
- slotStatuses,
- containsInAnyOrder(
- is(
- new SlotStatus(
- new SlotID(resourceId, 0),
-
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE,
- jobId,
- allocationId1)),
- is(
- new SlotStatus(
- new SlotID(resourceId, 1),
-
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE,
- null,
- null)),
- is(
- new SlotStatus(
- new SlotID(resourceId, 2),
-
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE,
- null,
- null)),
- is(
- new SlotStatus(
- new SlotID(resourceId, 4),
-
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE,
- jobId,
- allocationId3))));
+ assertThat(slotStatuses.size()).isEqualTo(4);
Review Comment:
```suggestion
assertThat(slotStatuses).hasSize(4);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java:
##########
@@ -427,35 +411,37 @@ public void testRemoveTaskCallsFreeSlotAction() throws
Exception {
// to enable that the last remaining finished task does the final
slot freeing
taskSlotTable.freeSlot(allocationId);
taskSlotTable.removeTask(executionAttemptId);
- assertThat(freeSlotFuture.get(), is(allocationId));
+ assertThat(freeSlotFuture.get()).isEqualTo(allocationId);
Review Comment:
```suggestion
assertThat(freeSlotFuture).isCompletedWithValue(allocationId);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java:
##########
@@ -467,20 +453,21 @@ public void testAllocatedSlotTimeout() throws Exception {
createTaskSlotTableAndStart(1, testingSlotActions)) {
final AllocationID allocationId = new AllocationID();
assertThat(
- taskSlotTable.allocateSlot(0, new JobID(), allocationId,
Time.milliseconds(1L)),
- is(true));
- assertThat(timeoutFuture.join(), is(allocationId));
+ taskSlotTable.allocateSlot(
+ 0, new JobID(), allocationId,
Time.milliseconds(1L)))
+ .isTrue();
+ assertThat(timeoutFuture.join()).isEqualTo(allocationId);
Review Comment:
```suggestion
assertThat(timeoutFuture).isCompletedWithValue(allocationId);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java:
##########
@@ -427,35 +411,37 @@ public void testRemoveTaskCallsFreeSlotAction() throws
Exception {
// to enable that the last remaining finished task does the final
slot freeing
taskSlotTable.freeSlot(allocationId);
taskSlotTable.removeTask(executionAttemptId);
- assertThat(freeSlotFuture.get(), is(allocationId));
+ assertThat(freeSlotFuture.get()).isEqualTo(allocationId);
}
}
- @Test(timeout = 10000)
- public void testFreeSlotInterruptsSubmittedTask() throws Exception {
+ @Test
+ @Timeout(10)
+ void testFreeSlotInterruptsSubmittedTask() throws Exception {
TestingTaskSlotPayload task = new TestingTaskSlotPayload();
try (final TaskSlotTable<TaskSlotPayload> taskSlotTable =
createTaskSlotTableWithStartedTask(task)) {
- assertThat(taskSlotTable.freeSlot(task.getAllocationId()), is(-1));
+
assertThat(taskSlotTable.freeSlot(task.getAllocationId())).isEqualTo(-1);
task.waitForFailure();
task.terminate();
}
}
- @Test(timeout = 10000)
- public void testTableIsClosedOnlyWhenAllTasksTerminated() throws Exception
{
+ @Test
+ @Timeout(10)
+ void testTableIsClosedOnlyWhenAllTasksTerminated() throws Exception {
TestingTaskSlotPayload task = new TestingTaskSlotPayload();
final TaskSlotTable<TaskSlotPayload> taskSlotTable =
createTaskSlotTableWithStartedTask(task);
- assertThat(taskSlotTable.freeSlot(task.getAllocationId()), is(-1));
+
assertThat(taskSlotTable.freeSlot(task.getAllocationId())).isEqualTo(-1);
CompletableFuture<Void> closingFuture = taskSlotTable.closeAsync();
- assertThat(closingFuture.isDone(), is(false));
+ assertThat(closingFuture.isDone()).isFalse();
Review Comment:
```suggestion
assertThat(closingFuture).isNotDone();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java:
##########
@@ -19,81 +19,75 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.Collections;
-import static junit.framework.TestCase.assertNotNull;
-import static junit.framework.TestCase.assertTrue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link PartitionTable}. */
-public class PartitionTableTest extends TestLogger {
+class PartitionTableTest {
private static final JobID JOB_ID = new JobID();
private static final ResultPartitionID PARTITION_ID = new
ResultPartitionID();
@Test
- public void testEmptyTable() {
+ void testEmptyTable() {
final PartitionTable<JobID> table = new PartitionTable<>();
// an empty table should always return an empty collection
Collection<ResultPartitionID> partitionsForNonExistingJob =
table.stopTrackingPartitions(JOB_ID);
- assertNotNull(partitionsForNonExistingJob);
- assertThat(partitionsForNonExistingJob, empty());
+ assertThat(partitionsForNonExistingJob).isNotNull();
+ assertThat(partitionsForNonExistingJob).isEmpty();
Review Comment:
```suggestion
assertThat(partitionsForNonExistingJob).isNotNull().isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -36,139 +36,127 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
-import org.junit.After;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nonnull;
import java.io.File;
import java.net.InetAddress;
+import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link TaskManagerRunner}. */
-public class TaskManagerRunnerTest extends TestLogger {
+@Timeout(30)
+class TaskManagerRunnerTest {
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
-
- @Rule public final Timeout timeout = Timeout.seconds(30);
+ @TempDir private Path temporaryFolder;
private TaskManagerRunner taskManagerRunner;
- @After
- public void after() throws Exception {
+ @AfterEach
+ void after() throws Exception {
if (taskManagerRunner != null) {
taskManagerRunner.close();
}
}
@Test
- public void testShouldShutdownOnFatalError() throws Exception {
+ void testShouldShutdownOnFatalError() throws Exception {
Configuration configuration = createConfiguration();
// very high timeout, to ensure that we don't fail because of
registration timeouts
configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT,
TimeUtils.parseDuration("42 h"));
taskManagerRunner = createTaskManagerRunner(configuration);
taskManagerRunner.onFatalError(new RuntimeException());
- assertThat(
- taskManagerRunner.getTerminationFuture().join(),
- is(equalTo(TaskManagerRunner.Result.FAILURE)));
+ assertThat(taskManagerRunner.getTerminationFuture().join())
+ .isEqualTo(TaskManagerRunner.Result.FAILURE);
}
@Test
- public void testShouldShutdownIfRegistrationWithJobManagerFails() throws
Exception {
+ void testShouldShutdownIfRegistrationWithJobManagerFails() throws
Exception {
Configuration configuration = createConfiguration();
configuration.set(
TaskManagerOptions.REGISTRATION_TIMEOUT,
TimeUtils.parseDuration("10 ms"));
taskManagerRunner = createTaskManagerRunner(configuration);
- assertThat(
- taskManagerRunner.getTerminationFuture().join(),
- is(equalTo(TaskManagerRunner.Result.FAILURE)));
+ assertThat(taskManagerRunner.getTerminationFuture().join())
+ .isEqualTo(TaskManagerRunner.Result.FAILURE);
}
@Test
- public void testGenerateTaskManagerResourceIDWithMetaData() throws
Exception {
+ void testGenerateTaskManagerResourceIDWithMetaData() throws Exception {
final Configuration configuration = createConfiguration();
final String metadata = "test";
configuration.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA,
metadata);
final ResourceID taskManagerResourceID =
TaskManagerRunner.getTaskManagerResourceID(configuration, "",
-1).unwrap();
- assertThat(taskManagerResourceID.getMetadata(), equalTo(metadata));
+ assertThat(taskManagerResourceID.getMetadata()).isEqualTo(metadata);
}
@Test
- public void testGenerateTaskManagerResourceIDWithoutMetaData() throws
Exception {
+ void testGenerateTaskManagerResourceIDWithoutMetaData() throws Exception {
final Configuration configuration = createConfiguration();
final String resourceID = "test";
configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID,
resourceID);
final ResourceID taskManagerResourceID =
TaskManagerRunner.getTaskManagerResourceID(configuration, "",
-1).unwrap();
- assertThat(taskManagerResourceID.getMetadata(), equalTo(""));
- assertThat(taskManagerResourceID.getStringWithMetadata(),
equalTo("test"));
+ assertThat(taskManagerResourceID.getMetadata()).isEqualTo("");
Review Comment:
```suggestion
assertThat(taskManagerResourceID.getMetadata()).isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -36,139 +36,127 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
-import org.junit.After;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nonnull;
import java.io.File;
import java.net.InetAddress;
+import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link TaskManagerRunner}. */
-public class TaskManagerRunnerTest extends TestLogger {
+@Timeout(30)
+class TaskManagerRunnerTest {
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
-
- @Rule public final Timeout timeout = Timeout.seconds(30);
+ @TempDir private Path temporaryFolder;
private TaskManagerRunner taskManagerRunner;
- @After
- public void after() throws Exception {
+ @AfterEach
+ void after() throws Exception {
if (taskManagerRunner != null) {
taskManagerRunner.close();
}
}
@Test
- public void testShouldShutdownOnFatalError() throws Exception {
+ void testShouldShutdownOnFatalError() throws Exception {
Configuration configuration = createConfiguration();
// very high timeout, to ensure that we don't fail because of
registration timeouts
configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT,
TimeUtils.parseDuration("42 h"));
taskManagerRunner = createTaskManagerRunner(configuration);
taskManagerRunner.onFatalError(new RuntimeException());
- assertThat(
- taskManagerRunner.getTerminationFuture().join(),
- is(equalTo(TaskManagerRunner.Result.FAILURE)));
+ assertThat(taskManagerRunner.getTerminationFuture().join())
+ .isEqualTo(TaskManagerRunner.Result.FAILURE);
Review Comment:
```suggestion
assertThat(taskManagerRunner.getTerminationFuture()).isCompletedWithValue(TaskManagerRunner.Result.FAILURE);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -181,14 +169,12 @@ public void
testUnexpectedTaskManagerTerminationFailsRunnerFatally() throws Exce
terminationFuture.complete(null);
- assertThat(
- taskManagerRunner.getTerminationFuture().join(),
- is(equalTo(TaskManagerRunner.Result.FAILURE)));
+ assertThat(taskManagerRunner.getTerminationFuture().join())
Review Comment:
```suggestion
assertThat(taskManagerRunner.getTerminationFuture()).isCompletedWithValue(TaskManagerRunner.Result.FAILURE);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -1708,9 +1694,9 @@ public void testRemoveJobFromJobLeaderService() throws
Exception {
final SlotID slotId = buildSlotID(0);
final AllocationID allocationId = new AllocationID();
- assertThat(startFuture.isDone(), is(false));
+ assertThat(startFuture.isDone()).isFalse();
Review Comment:
```suggestion
assertThat(startFuture).isNotDone();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -893,7 +880,7 @@ public void testTaskSlotTableTerminationOnShutdown() throws
Exception {
}
// check task executor is waiting for the task completion and has not
terminated yet
- assertThat(taskExecutorTerminationFuture.isDone(), is(false));
+ assertThat(taskExecutorTerminationFuture.isDone()).isFalse();
Review Comment:
```suggestion
assertThat(taskExecutorTerminationFuture).isNotDone();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -1871,15 +1852,11 @@ public void testIgnoringSlotRequestsIfNotRegistered()
throws Exception {
testingResourceManagerGateway.getFencingToken(),
timeout);
- try {
- slotRequestResponse.get();
- fail(
- "We should not be able to request slots before the
TaskExecutor is registered at the ResourceManager.");
- } catch (ExecutionException ee) {
- assertThat(
- ExceptionUtils.stripExecutionException(ee),
- instanceOf(TaskManagerException.class));
- }
+ assertThatThrownBy(slotRequestResponse::get)
+ .withFailMessage(
+ "We should not be able to request slots before the
TaskExecutor is registered at the ResourceManager.")
+ .isInstanceOf(ExecutionException.class)
+ .hasCauseInstanceOf(TaskManagerException.class);
Review Comment:
```suggestion
FlinkAssertions.assertThatFuture(slotRequestResponse)
.withFailMessage(
"We should not be able to request slots before
the TaskExecutor is registered at the ResourceManager.")
.eventuallyFailsWith(ExecutionException.class)
.withCauseInstanceOf(TaskManagerException.class);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -200,13 +189,11 @@ public class TaskExecutorTest extends TestLogger {
MemorySize.parse("4m"),
Collections.emptyList());
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService>
EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
-
- @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+ @RegisterExtension
+ private static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_EXTENSION =
+ TestingUtils.defaultExecutorExtension();
- @Rule public final TestName testName = new TestName();
+ @TempDir private Path tmp;
Review Comment:
How about renaming it to `tmpDir`? The tmp is ambiguous.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -589,21 +576,21 @@ private void runResourceManagerHeartbeatTest(
// heartbeat timeout should trigger disconnect TaskManager from
ResourceManager
assertThat(
- taskExecutorDisconnectFuture.get(
- timeout.toMilliseconds(), TimeUnit.MILLISECONDS),
- equalTo(unresolvedTaskManagerLocation.getResourceID()));
+ taskExecutorDisconnectFuture.get(
+ timeout.toMilliseconds(),
TimeUnit.MILLISECONDS))
+ .isEqualTo(unresolvedTaskManagerLocation.getResourceID());
Review Comment:
```suggestion
taskExecutorDisconnectFuture)
.succeedsWithin(timeout.toMilliseconds(),
TimeUnit.MILLISECONDS))
.isEqualTo(unresolvedTaskManagerLocation.getResourceID());
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -1980,14 +1955,14 @@ public void testInitialSlotReport() throws Exception {
testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway.getFencingToken().toUUID());
- assertThat(initialSlotReportFuture.get(),
equalTo(taskExecutor.getResourceID()));
+
assertThat(initialSlotReportFuture.get()).isEqualTo(taskExecutor.getResourceID());
Review Comment:
```suggestion
assertThat(initialSlotReportFuture).isCompletedWithValue(taskExecutor.getResourceID());
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -683,10 +670,9 @@ public void testHeartbeatReporting() throws Exception {
// register resource manager success will trigger monitoring
heartbeat target between tm
// and rm
- assertThat(
- taskExecutorRegistrationFuture.get(),
- equalTo(unresolvedTaskManagerLocation.getResourceID()));
- assertThat(initialSlotReportFuture.get(), equalTo(slotReport1));
+ assertThat(taskExecutorRegistrationFuture.get())
+ .isEqualTo(unresolvedTaskManagerLocation.getResourceID());
+ assertThat(initialSlotReportFuture.get()).isEqualTo(slotReport1);
Review Comment:
```suggestion
assertThat(taskExecutorRegistrationFuture)
.isCompletedWithValue(unresolvedTaskManagerLocation.getResourceID());
assertThat(initialSlotReportFuture).isCompletedWithValue(slotReport1);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2162,21 +2136,21 @@ public void testOfferSlotToJobMasterAfterTimeout()
throws Exception {
slotOfferings.await();
- assertThat(offeredSlotFuture.get(), is(allocationId));
- assertTrue(taskSlotTable.isSlotFree(1));
+ assertThat(offeredSlotFuture.get()).isEqualTo(allocationId);
Review Comment:
```suggestion
assertThat(offeredSlotFuture).isCompletedWithValue(allocationId);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2687,23 +2657,23 @@ public void
testReleaseOfJobResourcesIfJobMasterIsNotCorrect() throws Exception
jobMasterGateway.getAddress(),
jobMasterGateway.getFencingToken().toUUID());
// the slot should be freed
- assertThat(availableSlotFuture.get().f1, is(slotId));
- assertThat(availableSlotFuture.get().f2, is(allocationId));
+ assertThat(availableSlotFuture.get().f1).isEqualTo(slotId);
+ assertThat(availableSlotFuture.get().f2).isEqualTo(allocationId);
// all job partitions should be released
- assertThat(jobPartitionsReleaseFuture.get(), is(jobId));
+ assertThat(jobPartitionsReleaseFuture.get()).isEqualTo(jobId);
Review Comment:
```suggestion
assertThat(jobPartitionsReleaseFuture).isCompletedWithValue(jobId);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -3086,13 +3058,13 @@ public void testSharedResourcesLifecycle() throws
Exception {
ctx.taskExecutor.cancelTask(executions.get(i), timeout).get();
waitForTasks(ctx, numTasks -> numTasks > numRemaining);
if (numRemaining > 0) {
- assertEquals(0,
SharedResourceCollectingInvokable.timesDeallocated.get());
+
assertThat(SharedResourceCollectingInvokable.timesDeallocated.get()).isZero();
Review Comment:
```suggestion
assertThat(SharedResourceCollectingInvokable.timesDeallocated).hasValue(0);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2234,19 +2208,20 @@ public void testDisconnectFromJobMasterWhenNewLeader()
throws Exception {
jobManagerLeaderRetriever.notifyListener(
jobMasterGateway.getAddress(), UUID.randomUUID());
- assertThat(offeredSlotsFuture.get(), is(1));
+ assertThat(offeredSlotsFuture.get()).isOne();
Review Comment:
```suggestion
assertThat(offeredSlotsFuture).isCompletedWithValue(1);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]