afedulov commented on code in PR #19660:
URL: https://github.com/apache/flink/pull/19660#discussion_r876439486
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -247,12 +249,18 @@ public void
testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints() throw
DeliveryGuarantee.EXACTLY_ONCE,
2,
(records) ->
- assertThat(
- records,
- contains(
- LongStream.range(1,
lastCheckpointedRecord.get().get() + 1)
- .boxed()
- .toArray())));
+ assertThat(records)
+ .satisfies(
Review Comment:
Use contains directly?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -286,12 +293,13 @@ public void
testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
new FailingCheckpointMapper(failed, lastCheckpointedRecord),
config, "newPrefix");
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, true);
- assertThat(
- deserializeValues(collectedRecords),
- contains(
- LongStream.range(1, lastCheckpointedRecord.get().get()
+ 1)
- .boxed()
- .toArray()));
+ assertThat(deserializeValues(collectedRecords))
+ .satisfies(
Review Comment:
Use contains directly?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -165,13 +167,13 @@ public void testAsyncErrorRethrownOnInvoke() throws
Throwable {
testHarness.processElement(new StreamRecord<>("msg-2"));
} catch (Exception e) {
// the next invoke should rethrow the async exception
- Assert.assertTrue(e.getCause().getMessage().contains("artificial
async exception"));
+ assertThat(e.getCause().getMessage()).contains("artificial async
exception");
Review Comment:
assertThatThrownBy?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java:
##########
@@ -679,10 +666,9 @@ private void testFailingConsumerLifecycle(
"Exception should have been thrown from open / run method
of FlinkKafkaConsumerBase.");
} catch (Exception e) {
assertThat(
- ExceptionUtils.findThrowable(
- e, throwable ->
throwable.equals(expectedException))
- .isPresent(),
- is(true));
+ ExceptionUtils.findThrowable(
Review Comment:
assertThatThrownBy?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -227,68 +225,4 @@ public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
-
Review Comment:
Why are those not needed anymore?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java:
##########
@@ -395,11 +393,14 @@ void testAssigningEmptySplits() throws Exception {
new TestingReaderOutput<>(),
() -> reader.getNumAliveFetchers() == 0,
"The split fetcher did not exit before timeout.");
- MatcherAssert.assertThat(
- finishedSplits,
- Matchers.containsInAnyOrder(
-
KafkaPartitionSplit.toSplitId(normalSplit.getTopicPartition()),
-
KafkaPartitionSplit.toSplitId(emptySplit.getTopicPartition())));
+ assertThat(finishedSplits)
+ .satisfies(
Review Comment:
Use assertj contains* directly?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -408,10 +416,11 @@ private void writeRecordsToKafka(
drainAllRecordsFromTopic(
topic, deliveryGuarantee ==
DeliveryGuarantee.EXACTLY_ONCE);
final long recordsCount = expectedRecords.get().get();
- assertEquals(collectedRecords.size(), recordsCount);
- assertThat(
- deserializeValues(collectedRecords),
- contains(LongStream.range(1, recordsCount +
1).boxed().toArray()));
+ assertThat(recordsCount).isEqualTo(collectedRecords.size());
Review Comment:
Use contains directly?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -315,12 +322,13 @@ public void
testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exce
new FailingCheckpointMapper(failed, lastCheckpointedRecord),
config, null);
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, true);
- assertThat(
- deserializeValues(collectedRecords),
- contains(
- LongStream.range(1, lastCheckpointedRecord.get().get()
+ 1)
- .boxed()
- .toArray()));
+ assertThat(deserializeValues(collectedRecords))
+ .satisfies(
Review Comment:
Use contains directly?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -233,12 +229,18 @@ public void testRecoveryWithExactlyOnceGuarantee() throws
Exception {
DeliveryGuarantee.EXACTLY_ONCE,
1,
(records) ->
- assertThat(
- records,
- contains(
- LongStream.range(1,
lastCheckpointedRecord.get().get() + 1)
- .boxed()
- .toArray())));
+ assertThat(records)
+ .satisfies(
Review Comment:
Use contains directly?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java:
##########
@@ -77,21 +78,26 @@ public void testGetTransactions() {
final KafkaTransactionLog transactionLog =
new KafkaTransactionLog(getKafkaClientConfiguration());
final List<TransactionRecord> transactions =
transactionLog.getTransactions();
- assertThat(
- transactions,
- containsInAnyOrder(
- new TransactionRecord(buildTransactionalId(1), Empty),
- new TransactionRecord(buildTransactionalId(1),
Ongoing),
- new TransactionRecord(buildTransactionalId(1),
PrepareCommit),
- new TransactionRecord(buildTransactionalId(1),
CompleteCommit),
- new TransactionRecord(buildTransactionalId(2), Empty),
- new TransactionRecord(buildTransactionalId(2),
Ongoing),
- new TransactionRecord(buildTransactionalId(2),
PrepareAbort),
- new TransactionRecord(buildTransactionalId(2),
CompleteAbort),
- new TransactionRecord(buildTransactionalId(3), Empty),
- new TransactionRecord(buildTransactionalId(3),
Ongoing),
- new TransactionRecord(buildTransactionalId(4), Empty),
- new TransactionRecord(buildTransactionalId(4),
Ongoing)));
+ assertThat(transactions)
+ .satisfies(
Review Comment:
Use assertj contains* directly?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -202,13 +204,13 @@ public void testAsyncErrorRethrownOnCheckpoint() throws
Throwable {
testHarness.snapshot(123L, 123L);
} catch (Exception e) {
// the next invoke should rethrow the async exception
- Assert.assertTrue(e.getCause().getMessage().contains("artificial
async exception"));
+ assertThat(e.getCause().getMessage()).contains("artificial async
exception");
Review Comment:
assertThatThrownBy?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -266,14 +268,14 @@ public void go() throws Exception {
snapshotThread.sync();
} catch (Exception e) {
// the snapshot should have failed with the async exception
- Assert.assertTrue(
- e.getCause().getMessage().contains("artificial async
failure for 2nd message"));
+ assertThat(e.getCause().getMessage())
Review Comment:
assertThatThrownBy?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]