PatrickRen commented on a change in pull request #18014:
URL: https://github.com/apache/flink/pull/18014#discussion_r771252973
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
##########
@@ -354,20 +354,22 @@ public void testKafkaSourceMetrics() throws Exception {
Matchers.greaterThan(0L));
// Committed offset should be NUM_RECORD_PER_SPLIT
- assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp0,
metricListener));
- assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp1,
metricListener));
+ assertThat(getCommittedOffsetMetric(tp0, metricListener))
+ .isEqualTo(NUM_RECORDS_PER_SPLIT);
+ assertThat(getCommittedOffsetMetric(tp1, metricListener))
+ .isEqualTo(NUM_RECORDS_PER_SPLIT);
// Number of successful commits should be greater than 0
final Optional<Counter> commitsSucceeded =
metricListener.getCounter(
KAFKA_SOURCE_READER_METRIC_GROUP,
COMMITS_SUCCEEDED_METRIC_COUNTER);
- assertTrue(commitsSucceeded.isPresent());
+ assertThat(commitsSucceeded.isPresent()).isTrue();
Review comment:
Nit: You can use `assertThat(Optional).isPresent()` for validating
`Optional`s.
##########
File path:
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase.java
##########
@@ -140,21 +138,23 @@ public void testAvailableOnEmptyQueue() throws Exception {
}
}
- @Test(timeout = 30000L)
- public void testSnapshot() throws Exception {
+ @Test
+ @Timeout(30)
+ void testSnapshot() throws Exception {
ValidatingSourceOutput output = new ValidatingSourceOutput();
// Add a split to start the fetcher.
List<SplitT> splits =
getSplits(numSplits, NUM_RECORDS_PER_SPLIT,
Boundedness.CONTINUOUS_UNBOUNDED);
try (SourceReader<Integer, SplitT> reader =
consumeRecords(splits, output, totalNumRecords)) {
List<SplitT> state = reader.snapshotState(1L);
- assertEquals("The snapshot should only have 10 splits. ",
numSplits, state.size());
+ assertThat(state)
+ .withFailMessage("The snapshot should only have 10 splits.
")
Review comment:
Use `as()` instead
##########
File path:
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase.java
##########
@@ -140,21 +138,23 @@ public void testAvailableOnEmptyQueue() throws Exception {
}
}
- @Test(timeout = 30000L)
- public void testSnapshot() throws Exception {
+ @Test
+ @Timeout(30)
+ void testSnapshot() throws Exception {
ValidatingSourceOutput output = new ValidatingSourceOutput();
// Add a split to start the fetcher.
List<SplitT> splits =
getSplits(numSplits, NUM_RECORDS_PER_SPLIT,
Boundedness.CONTINUOUS_UNBOUNDED);
try (SourceReader<Integer, SplitT> reader =
consumeRecords(splits, output, totalNumRecords)) {
List<SplitT> state = reader.snapshotState(1L);
- assertEquals("The snapshot should only have 10 splits. ",
numSplits, state.size());
+ assertThat(state)
+ .withFailMessage("The snapshot should only have 10 splits.
")
+ .hasSize(numSplits);
for (int i = 0; i < numSplits; i++) {
- assertEquals(
- "The first four splits should have been fully
consumed.",
- NUM_RECORDS_PER_SPLIT,
- getNextRecordIndex(state.get(i)));
+ assertThat(getNextRecordIndex(state.get(i)))
+ .withFailMessage("The first four splits should have
been fully consumed.")
Review comment:
Use `as()` instead
##########
File path:
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase.java
##########
@@ -108,28 +103,31 @@ public void testAddSplitToExistingFetcher() throws
Exception {
}
}
- @Test(timeout = 30000L)
- public void testPollingFromEmptyQueue() throws Exception {
+ @Test
+ @Timeout(30)
+ void testPollingFromEmptyQueue() throws Exception {
ValidatingSourceOutput output = new ValidatingSourceOutput();
List<SplitT> splits =
Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT,
Boundedness.BOUNDED));
// Consumer all the records in the s;oit.
try (SourceReader<Integer, SplitT> reader =
consumeRecords(splits, output, NUM_RECORDS_PER_SPLIT)) {
// Now let the main thread poll again.
- assertEquals(
- "The status should be ",
- InputStatus.NOTHING_AVAILABLE,
- reader.pollNext(output));
+ assertThat(reader.pollNext(output))
+ .withFailMessage("The status should be %s",
InputStatus.NOTHING_AVAILABLE)
+ .isEqualTo(InputStatus.NOTHING_AVAILABLE);
}
}
- @Test(timeout = 30000L)
- public void testAvailableOnEmptyQueue() throws Exception {
+ @Test
+ @Timeout(30)
+ void testAvailableOnEmptyQueue() throws Exception {
// Consumer all the records in the split.
try (SourceReader<Integer, SplitT> reader = createReader()) {
CompletableFuture<?> future = reader.isAvailable();
- assertFalse("There should be no records ready for poll.",
future.isDone());
+ assertThat(future.isDone())
+ .withFailMessage("There should be no records ready for
poll.")
Review comment:
I think we should use`assertThat().as()` here instead of
`withFailMessage()`. `withFailMessage()` will override the original generated
"expected xxx but yyy" message, which is helpful for debugging. `as()` is
designed for describing the assertion.
##########
File path:
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase.java
##########
@@ -207,19 +207,16 @@ public void collect(Integer element, long timestamp) {
public void validate() {
- assertEquals(
- String.format("Should be %d distinct elements in total",
totalNumRecords),
- totalNumRecords,
- consumedValues.size());
- assertEquals(
- String.format("Should be %d elements in total",
totalNumRecords),
- totalNumRecords,
- count);
- assertEquals("The min value should be 0", 0, min);
- assertEquals(
- String.format("The max value should be %d",
totalNumRecords - 1),
- totalNumRecords - 1,
- max);
+ assertThat(consumedValues)
+ .withFailMessage("Should be %d distinct elements in
total", totalNumRecords)
Review comment:
Use `as()` instead
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]