C0urante commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r929381599
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
recordNum >= recordsProduced);
Review Comment:
Ahh, good catch--the "all" is incorrect in that comment. I've updated it
from "consume all records" to "consume at least the expected number of records".
On a separate note, I believe a similar mixup is the cause of
https://issues.apache.org/jira/browse/KAFKA-14101, where we _should_ be using
`consumeAll` but aren't at the moment. I've pushed a commit that should fix
that.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
recordNum >= recordsProduced);
// also consume from the connector's dedicated offsets topic; just
need to read one offset record
- ConsumerRecord<byte[], byte[]> offsetRecord =
connectorTargetedCluster
- .consume(
- 1,
+ ConsumerRecords<byte[], byte[]> offsetRecords =
connectorTargetedCluster
+ .consumeAll(
TimeUnit.MINUTES.toMillis(1),
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed"),
+ null,
offsetsTopic
- ).iterator().next();
- long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
- assertEquals("Offset commits should occur on connector-defined
poll boundaries, which happen every " + recordsProduced + " records",
- 0, seqno % recordsProduced);
+ );
+ List<Long> seqnos =
parseAndAssertOffsetsForSingleTask(offsetRecords);
+ seqnos.forEach(seqno ->
+ assertEquals("Offset commits should occur on connector-defined
poll boundaries, which happen every " + recordsProduced + " records",
+ 0, seqno % recordsProduced)
+ );
// also consume from the cluster's global offsets topic; again,
just need to read one offset record
Review Comment:
Right again, thanks for the catch!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]