This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7aefdf2c6aefe8c24af30a4f28a59f2780503d21 Author: Arvid Heise <[email protected]> AuthorDate: Thu Mar 17 22:07:58 2022 +0100 [refactor][streaming] Migrate Source(Operator)StreamTaskTest to JUnit5 and assertj --- .../tasks/SourceOperatorStreamTaskTest.java | 61 +++++++------- .../runtime/tasks/SourceStreamTaskTest.java | 93 +++++++++++----------- .../runtime/tasks/SourceStreamTaskTestBase.java | 23 +++--- 3 files changed, 85 insertions(+), 92 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java index 3d9087f..0ea212a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java @@ -59,7 +59,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.LifeCycleMonitor.LifeCyclePhase; import org.apache.flink.util.SerializedValue; -import org.junit.Test; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.io.Serializable; @@ -75,29 +76,24 @@ import java.util.function.Supplier; import java.util.stream.IntStream; import static org.apache.flink.streaming.util.TestHarnessUtil.assertOutputEquals; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** * Tests for verifying that the {@link SourceOperator} as a task input can be integrated well with * {@link org.apache.flink.streaming.runtime.io.StreamOneInputProcessor}. */ -public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { +class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { + private static final OperatorID OPERATOR_ID = new OperatorID(); private static final int NUM_RECORDS = 10; @Test - public void testMetrics() throws Exception { + void testMetrics() throws Exception { testMetrics( SourceOperatorStreamTask::new, new SourceOperatorFactory<>( new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), - lessThanOrEqualTo(1_000_000d)); + busyTime -> busyTime.isLessThanOrEqualTo(1_000_000d)); } /** @@ -105,7 +101,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { * operators. */ @Test - public void testSnapshotAndRestore() throws Exception { + void testSnapshotAndRestore() throws Exception { // process NUM_RECORDS records and take a snapshot. TaskStateSnapshot taskStateSnapshot = executeAndWaitForCheckpoint(1, null, IntStream.range(0, NUM_RECORDS)); @@ -116,7 +112,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { } @Test - public void testSnapshotAndAdvanceToEndOfEventTime() throws Exception { + void testSnapshotAndAdvanceToEndOfEventTime() throws Exception { final int checkpointId = 1; try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness(checkpointId, null)) { @@ -139,7 +135,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { } @Test - public void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception { + void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception { try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness()) { testHarness.processAll(); testHarness.finishProcessing(); @@ -147,22 +143,22 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { Queue<Object> expectedOutput = new LinkedList<>(); expectedOutput.add(Watermark.MAX_WATERMARK); expectedOutput.add(new EndOfData(StopMode.DRAIN)); - assertThat(testHarness.getOutput().toArray(), equalTo(expectedOutput.toArray())); + assertThat(testHarness.getOutput().toArray()).isEqualTo(expectedOutput.toArray()); } } @Test - public void testNotEmittingMaxWatermarkAfterCancelling() throws Exception { + void testNotEmittingMaxWatermarkAfterCancelling() throws Exception { try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness()) { testHarness.getStreamTask().cancel(); testHarness.finishProcessing(); - assertThat(testHarness.getOutput(), hasSize(0)); + assertThat(testHarness.getOutput()).hasSize(0); } } @Test - public void testExternallyInducedSource() throws Exception { + void testExternallyInducedSource() throws Exception { final int numEventsBeforeCheckpoint = 10; final int totalNumEvents = 20; TestingExternallyInducedSourceReader testingReader = @@ -176,17 +172,18 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { testHarness.processAll(); - assertEquals(totalNumEvents, runtimeTestingReader.numEmittedEvents); - assertTrue(runtimeTestingReader.checkpointed); - assertEquals( - TestingExternallyInducedSourceReader.CHECKPOINT_ID, - runtimeTestingReader.checkpointedId); - assertEquals(numEventsBeforeCheckpoint, runtimeTestingReader.checkpointedAt); + assertThat(runtimeTestingReader.numEmittedEvents).isEqualTo(totalNumEvents); + assertThat(runtimeTestingReader.checkpointed).isTrue(); + assertThat(runtimeTestingReader.checkpointedId) + .isEqualTo(TestingExternallyInducedSourceReader.CHECKPOINT_ID); + assertThat(runtimeTestingReader.checkpointedAt).isEqualTo(numEventsBeforeCheckpoint); + Assertions.assertThat(testHarness.getOutput()) + .contains(new CheckpointBarrier(2, 2, checkpointOptions)); } } @Test - public void testSkipExecutionIfFinishedOnRestore() throws Exception { + void testSkipExecutionIfFinishedOnRestore() throws Exception { TaskStateSnapshot taskStateSnapshot = TaskStateSnapshot.FINISHED_ON_RESTORE; LifeCycleMonitorSource testingSource = @@ -215,7 +212,8 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { testHarness.getStreamTask().invoke(); testHarness.processAll(); - assertThat(output, contains(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN))); + assertThat(output) + .containsExactly(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)); LifeCycleMonitorSourceReader sourceReader = (LifeCycleMonitorSourceReader) @@ -226,7 +224,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { } @Test - public void testTriggeringStopWithSavepointWithDrain() throws Exception { + void testTriggeringStopWithSavepointWithDrain() throws Exception { SourceOperatorFactory<Integer> sourceOperatorFactory = new SourceOperatorFactory<>( new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2), @@ -271,9 +269,9 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { testHarness.waitForTaskCompletion(); testHarness.finishProcessing(); - assertTrue(triggerResult.isDone()); - assertTrue(triggerResult.get()); - assertTrue(checkpointCompleted.isDone()); + assertThat(triggerResult.isDone()).isTrue(); + assertThat(triggerResult.get()).isTrue(); + assertThat(checkpointCompleted.isDone()).isTrue(); } } @@ -304,7 +302,8 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { expectedOutput.add( new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions)); - assertEquals(checkpointId, testHarness.taskStateManager.getReportedCheckpointId()); + assertThat(testHarness.taskStateManager.getReportedCheckpointId()) + .isEqualTo(checkpointId); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); return testHarness.taskStateManager.getLastJobManagerTaskStateSnapshot(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index aa4b364..d16c5ab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -70,8 +70,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.CheckedSupplier; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.io.Serializable; @@ -94,23 +93,18 @@ import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO; import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; import static org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpointsTest.triggerCheckpoint; import static org.apache.flink.util.Preconditions.checkState; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** * These tests verify that the RichFunction methods are called (in correct order). And that * checkpointing/element emission don't occur concurrently. */ -public class SourceStreamTaskTest extends SourceStreamTaskTestBase { +class SourceStreamTaskTest extends SourceStreamTaskTestBase { /** This test verifies that open() and close() are correctly called by the StreamTask. */ @Test - public void testOpenClose() throws Exception { + void testOpenClose() throws Exception { final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO); @@ -124,15 +118,17 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { testHarness.invoke(); testHarness.waitForTaskCompletion(); - assertTrue("RichFunction methods where not called.", OpenCloseTestSource.closeCalled); + assertThat(OpenCloseTestSource.closeCalled) + .as("RichFunction methods where not called.") + .isTrue(); List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput()); - Assert.assertEquals(10, resultElements.size()); + assertThat(resultElements.size()).isEqualTo(10); } @Test - public void testMetrics() throws Exception { + void testMetrics() throws Exception { testMetrics( SourceStreamTask::new, SimpleOperatorFactory.of( @@ -140,7 +136,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { new CancelTestSource( INT_TYPE_INFO.createSerializer(new ExecutionConfig()), 42))), - is(Double.NaN)); + busyTime -> busyTime.isNaN()); } /** @@ -157,7 +153,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { */ @Test @SuppressWarnings("unchecked") - public void testCheckpointing() throws Exception { + void testCheckpointing() throws Exception { final int numElements = 100; final int numCheckpoints = 100; final int numCheckpointers = 1; @@ -214,14 +210,14 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput()); - Assert.assertEquals(numElements, resultElements.size()); + assertThat(resultElements.size()).isEqualTo(numElements); } finally { executor.shutdown(); } } @Test - public void testClosingAllOperatorsOnChainProperly() throws Exception { + void testClosingAllOperatorsOnChainProperly() throws Exception { final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO); @@ -254,11 +250,11 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { new StreamRecord<>("[Operator1]: Finish")); final Object[] output = testHarness.getOutput().toArray(); - assertArrayEquals("Output was not correct.", expected.toArray(), output); + assertThat(output).as("Output was not correct.").isEqualTo(expected.toArray()); } @Test - public void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception { + void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception { final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO); @@ -299,22 +295,22 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { } @Test - public void testCancellationWithSourceBlockedOnLock() throws Exception { + void testCancellationWithSourceBlockedOnLock() throws Exception { testCancellationWithSourceBlockedOnLock(false, false); } @Test - public void testCancellationWithSourceBlockedOnLockWithPendingMail() throws Exception { + void testCancellationWithSourceBlockedOnLockWithPendingMail() throws Exception { testCancellationWithSourceBlockedOnLock(true, false); } @Test - public void testCancellationWithSourceBlockedOnLockAndThrowingOnError() throws Exception { + void testCancellationWithSourceBlockedOnLockAndThrowingOnError() throws Exception { testCancellationWithSourceBlockedOnLock(false, true); } @Test - public void testCancellationWithSourceBlockedOnLockWithPendingMailAndThrowingOnError() + void testCancellationWithSourceBlockedOnLockWithPendingMailAndThrowingOnError() throws Exception { testCancellationWithSourceBlockedOnLock(true, true); } @@ -324,8 +320,8 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { * StreamTask} which, as of the time this test is being written, is not tested anywhere else * (like {@link StreamTaskTest} or {@link OneInputStreamTaskTest}). */ - public void testCancellationWithSourceBlockedOnLock( - boolean withPendingMail, boolean throwInCancel) throws Exception { + void testCancellationWithSourceBlockedOnLock(boolean withPendingMail, boolean throwInCancel) + throws Exception { final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO); @@ -354,9 +350,9 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { .createExecutor(0) .execute( () -> - assertFalse( - "This should never execute before task cancelation", - testHarness.getTask().isRunning()), + assertThat(testHarness.getTask().isRunning()) + .as("This should never execute before task cancelation") + .isFalse(), "Test"); } @@ -427,12 +423,12 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { } @Test - public void testInterruptionExceptionNotSwallowed() throws Exception { + void testInterruptionExceptionNotSwallowed() throws Exception { testInterruptionExceptionNotSwallowed(InterruptedException::new); } @Test - public void testWrappedInterruptionExceptionNotSwallowed() throws Exception { + void testWrappedInterruptionExceptionNotSwallowed() throws Exception { testInterruptionExceptionNotSwallowed( () -> new RuntimeException(new FlinkRuntimeException(new InterruptedException()))); } @@ -491,7 +487,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { } @Test - public void testWaitsForSourceThreadOnCancel() throws Exception { + void testWaitsForSourceThreadOnCancel() throws Exception { StreamTaskTestHarness<String> harness = new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO); @@ -504,13 +500,13 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { // SourceStreamTask should be still waiting for NonStoppingSource after cancellation harness.getTask().cancel(); harness.waitForTaskCompletion(50, true); // allow task to exit prematurely - assertTrue(harness.taskThread.isAlive()); + assertThat(harness.taskThread.isAlive()).isTrue(); // SourceStreamTask should be still waiting for NonStoppingSource after interruptions for (int i = 0; i < 10; i++) { harness.getTask().maybeInterruptOnCancel(harness.getTaskThread(), null, null); harness.waitForTaskCompletion(50, true); // allow task to exit prematurely - assertTrue(harness.taskThread.isAlive()); + assertThat(harness.taskThread.isAlive()).isTrue(); } // It should only exit once NonStoppingSource allows for it @@ -519,7 +515,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { } @Test - public void testTriggeringCheckpointAfterSourceThreadFinished() throws Exception { + void testTriggeringCheckpointAfterSourceThreadFinished() throws Exception { ResultPartition[] partitionWriters = new ResultPartition[2]; try (NettyShuffleEnvironment env = new NettyShuffleEnvironmentBuilder() @@ -585,12 +581,12 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { (id, error) -> testHarness.getStreamTask().notifyCheckpointCompleteAsync(2)); testHarness.finishProcessing(); - assertTrue(checkpointFuture.isDone()); + assertThat(checkpointFuture.isDone()).isTrue(); // Each result partition should have emitted 1 barrier, 1 max watermark and 1 // EndOfUserRecordEvent. for (ResultPartition resultPartition : partitionWriters) { - assertEquals(3, resultPartition.getNumberOfQueuedBuffers()); + assertThat(resultPartition.getNumberOfQueuedBuffers()).isEqualTo(3); } } } finally { @@ -603,7 +599,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { } @Test - public void testClosedOnRestoreSourceSkipExecution() throws Exception { + void testClosedOnRestoreSourceSkipExecution() throws Exception { LifeCycleMonitorSource testSource = new LifeCycleMonitorSource(); List<Object> output = new ArrayList<>(); try (StreamTaskMailboxTestHarness<String> harness = @@ -626,7 +622,8 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { harness.processAll(); harness.streamTask.getCompletionFuture().get(); - assertThat(output, contains(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN))); + assertThat(output) + .containsExactly(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)); LifeCycleMonitorSource source = (LifeCycleMonitorSource) @@ -642,7 +639,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { } @Test - public void testTriggeringStopWithSavepointWithDrain() throws Exception { + void testTriggeringStopWithSavepointWithDrain() throws Exception { SourceFunction<String> testSource = new EmptySource(); CompletableFuture<Boolean> checkpointCompleted = new CompletableFuture<>(); @@ -684,9 +681,9 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { harness.streamTask.runMailboxLoop(); harness.finishProcessing(); - assertTrue(triggerResult.isDone()); - assertTrue(triggerResult.get()); - assertTrue(checkpointCompleted.isDone()); + assertThat(triggerResult.isDone()).isTrue(); + assertThat(triggerResult.get()).isTrue(); + assertThat(checkpointCompleted.isDone()).isTrue(); } } @@ -767,7 +764,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { public List<Serializable> snapshotState(long checkpointId, long timestamp) throws Exception { if (!semaphore.tryAcquire()) { - Assert.fail("Concurrent invocation of snapshotState."); + fail("Concurrent invocation of snapshotState."); } int startCount = count; lastCheckpointId = checkpointId; @@ -780,7 +777,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { if (startCount != count) { semaphore.release(); // This means that next() was invoked while the snapshot was ongoing - Assert.fail("Count is different at start end end of snapshot."); + fail("Count is different at start end end of snapshot."); } semaphore.release(); return Collections.singletonList(sum); @@ -871,7 +868,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { public void open(Configuration parameters) throws Exception { super.open(parameters); if (closeCalled) { - Assert.fail("Close called before open."); + fail("Close called before open."); } openCalled = true; } @@ -880,7 +877,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { public void close() throws Exception { super.close(); if (!openCalled) { - Assert.fail("Open was not called before close."); + fail("Open was not called before close."); } closeCalled = true; } @@ -888,7 +885,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { @Override public void run(SourceContext<String> ctx) throws Exception { if (!openCalled) { - Assert.fail("Open was not called before run."); + fail("Open was not called before run."); } for (int i = 0; i < 10; i++) { ctx.collect("Hello" + i); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java index 3065641..b477796 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java @@ -32,16 +32,15 @@ import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; -import org.hamcrest.Matcher; +import org.assertj.core.api.AbstractDoubleAssert; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.function.Consumer; import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Common base class for testing source tasks. */ public class SourceStreamTaskTestBase { @@ -49,7 +48,7 @@ public class SourceStreamTaskTestBase { FunctionWithException<Environment, ? extends StreamTask<Integer, ?>, Exception> taskFactory, StreamOperatorFactory<?> operatorFactory, - Matcher<Double> busyTimeMatcher) + Consumer<AbstractDoubleAssert<?>> busyTimeMatcher) throws Exception { long sleepTime = 42; @@ -73,26 +72,24 @@ public class SourceStreamTaskTestBase { OneShotLatch checkpointAcknowledgeLatch = new OneShotLatch(); harness.getCheckpointResponder().setAcknowledgeLatch(checkpointAcknowledgeLatch); - assertFalse(triggerFuture.isDone()); + assertThat(triggerFuture).isNotDone(); Thread.sleep(sleepTime); while (!triggerFuture.isDone()) { harness.streamTask.runMailboxStep(); } Gauge<Long> checkpointStartDelayGauge = (Gauge<Long>) metrics.get(MetricNames.CHECKPOINT_START_DELAY_TIME); - assertThat( - checkpointStartDelayGauge.getValue(), - greaterThanOrEqualTo(sleepTime * 1_000_000)); + assertThat(checkpointStartDelayGauge.getValue()) + .isGreaterThanOrEqualTo(sleepTime * 1_000_000); Gauge<Double> busyTimeGauge = (Gauge<Double>) metrics.get(MetricNames.TASK_BUSY_TIME); - assertThat(busyTimeGauge.getValue(), busyTimeMatcher); + busyTimeMatcher.accept(assertThat(busyTimeGauge.getValue())); checkpointAcknowledgeLatch.await(); TestCheckpointResponder.AcknowledgeReport acknowledgeReport = Iterables.getOnlyElement( harness.getCheckpointResponder().getAcknowledgeReports()); - assertThat( - acknowledgeReport.getCheckpointMetrics().getCheckpointStartDelayNanos(), - greaterThanOrEqualTo(sleepTime * 1_000_000)); + assertThat(acknowledgeReport.getCheckpointMetrics().getCheckpointStartDelayNanos()) + .isGreaterThanOrEqualTo(sleepTime * 1_000_000); } } }
