C0urante commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r929381599


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);

Review Comment:
   Ahh, good catch--the "all" is incorrect in that comment. I've updated it 
from "consume all records" to "consume at least the expected number of records".
   
   On a separate note, I believe a similar mixup is the cause of 
https://issues.apache.org/jira/browse/KAFKA-14101, where we _should_ be using 
`consumeAll` but aren't at the moment. I've pushed a commit that should fix 
that.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);
 
             // also consume from the connector's dedicated offsets topic; just 
need to read one offset record
-            ConsumerRecord<byte[], byte[]> offsetRecord = 
connectorTargetedCluster
-                    .consume(
-                            1,
+            ConsumerRecords<byte[], byte[]> offsetRecords = 
connectorTargetedCluster
+                    .consumeAll(
                             TimeUnit.MINUTES.toMillis(1),
                             
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
"read_committed"),
+                            null,
                             offsetsTopic
-                    ).iterator().next();
-            long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
-            assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + recordsProduced + " records",
-                    0, seqno % recordsProduced);
+                    );
+            List<Long> seqnos = 
parseAndAssertOffsetsForSingleTask(offsetRecords);
+            seqnos.forEach(seqno ->
+                assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + recordsProduced + " records",
+                        0, seqno % recordsProduced)
+            );
 
             // also consume from the cluster's global offsets topic; again, 
just need to read one offset record

Review Comment:
   Right again, thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to