This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dd6fcc650ed KAFKA-16901 Add unit tests for
ConsumerRecords#records(String) (#16227)
dd6fcc650ed is described below
commit dd6fcc650ed56eb8483de7ab0ab06d8b6b0de2d1
Author: TingIāu "Ting" Kì <[email protected]>
AuthorDate: Thu Jun 13 14:35:33 2024 +0800
KAFKA-16901 Add unit tests for ConsumerRecords#records(String) (#16227)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../clients/consumer/ConsumerRecordsTest.java | 165 ++++++++++++++++++---
1 file changed, 142 insertions(+), 23 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
index ea870c24781..2473fd9be28 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
@@ -16,10 +16,10 @@
*/
package org.apache.kafka.clients.consumer;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -31,32 +31,151 @@ import
org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class ConsumerRecordsTest {
@Test
- public void iterator() throws Exception {
+ public void testIterator() {
+ String topic = "topic";
+ int recordSize = 10;
+ int partitionSize = 15;
+ int emptyPartitionIndex = 3;
+ ConsumerRecords<Integer, String> records =
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex,
Collections.singleton(topic));
+ Iterator<ConsumerRecord<Integer, String>> iterator =
records.iterator();
- Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records =
new LinkedHashMap<>();
+ int recordCount = 0;
+ int partitionCount = 0;
+ int currentPartition = -1;
- String topic = "topic";
- records.put(new TopicPartition(topic, 0), new ArrayList<>());
- ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic,
1, 0, 0L, TimestampType.CREATE_TIME,
- 0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
- ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic,
1, 1, 0L, TimestampType.CREATE_TIME,
- 0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
- records.put(new TopicPartition(topic, 1), Arrays.asList(record1,
record2));
- records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
- ConsumerRecords<Integer, String> consumerRecords = new
ConsumerRecords<>(records);
- Iterator<ConsumerRecord<Integer, String>> iter =
consumerRecords.iterator();
-
- int c = 0;
- for (; iter.hasNext(); c++) {
- ConsumerRecord<Integer, String> record = iter.next();
- assertEquals(1, record.partition());
- assertEquals(topic, record.topic());
- assertEquals(c, record.offset());
+ while (iterator.hasNext()) {
+ ConsumerRecord<Integer, String> record = iterator.next();
+ validateEmptyPartition(record, emptyPartitionIndex);
+
+ // Check if we have moved to a new partition
+ if (currentPartition != record.partition()) {
+ // Increment the partition count as we have encountered a new
partition
+ partitionCount++;
+ // Update the current partition to the new partition
+ currentPartition = record.partition();
+ }
+
+ validateRecordPayload(topic, record, currentPartition,
recordCount, recordSize);
+ recordCount++;
+ }
+
+ // Including empty partition
+ assertEquals(partitionSize, partitionCount + 1);
+ }
+
+ @Test
+ public void testRecordsByPartition() {
+ List<String> topics = Arrays.asList("topic1", "topic2");
+ int recordSize = 3;
+ int partitionSize = 5;
+ int emptyPartitionIndex = 2;
+
+ ConsumerRecords<Integer, String> consumerRecords =
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+ for (String topic : topics) {
+ for (int partition = 0; partition < partitionSize; partition++) {
+ TopicPartition topicPartition = new TopicPartition(topic,
partition);
+ List<ConsumerRecord<Integer, String>> records =
consumerRecords.records(topicPartition);
+
+ if (partition == emptyPartitionIndex) {
+ assertTrue(records.isEmpty());
+ } else {
+ assertEquals(recordSize, records.size());
+ for (int i = 0; i < records.size(); i++) {
+ ConsumerRecord<Integer, String> record =
records.get(i);
+ validateRecordPayload(topic, record, partition, i,
recordSize);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRecordsByNullTopic() {
+ String nullTopic = null;
+ ConsumerRecords<Integer, String> consumerRecords =
ConsumerRecords.empty();
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () ->
consumerRecords.records(nullTopic));
+ assertEquals("Topic must be non-null.", exception.getMessage());
+ }
+
+
+ @Test
+ public void testRecordsByTopic() {
+ List<String> topics = Arrays.asList("topic1", "topic2", "topic3",
"topic4");
+ int recordSize = 3;
+ int partitionSize = 10;
+ int emptyPartitionIndex = 6;
+ int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize -
1);
+
+ ConsumerRecords<Integer, String> consumerRecords =
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+ for (String topic : topics) {
+ Iterable<ConsumerRecord<Integer, String>> records =
consumerRecords.records(topic);
+ int recordCount = 0;
+ int partitionCount = 0;
+ int currentPartition = -1;
+
+ for (ConsumerRecord<Integer, String> record : records) {
+ validateEmptyPartition(record, emptyPartitionIndex);
+
+ // Check if we have moved to a new partition
+ if (currentPartition != record.partition()) {
+ // Increment the partition count as we have encountered a
new partition
+ partitionCount++;
+ // Update the current partition to the new partition
+ currentPartition = record.partition();
+ }
+
+ validateRecordPayload(topic, record, currentPartition,
recordCount, recordSize);
+ recordCount++;
+ }
+
+ // Including empty partition
+ assertEquals(partitionSize, partitionCount + 1);
+ assertEquals(expectedTotalRecordSizeOfEachTopic, recordCount);
}
- assertEquals(2, c);
+ }
+
+ private ConsumerRecords<Integer, String> buildTopicTestRecords(int
recordSize,
+ int
partitionSize,
+ int
emptyPartitionIndex,
+
Collection<String> topics) {
+ Map<TopicPartition, List<ConsumerRecord<Integer, String>>>
partitionToRecords = new LinkedHashMap<>();
+ for (String topic : topics) {
+ for (int i = 0; i < partitionSize; i++) {
+ List<ConsumerRecord<Integer, String>> records = new
ArrayList<>(recordSize);
+ if (i != emptyPartitionIndex) {
+ for (int j = 0; j < recordSize; j++) {
+ records.add(
+ new ConsumerRecord<>(topic, i, j, 0L,
TimestampType.CREATE_TIME,
+ 0, 0, j, String.valueOf(j), new
RecordHeaders(), Optional.empty())
+ );
+ }
+ }
+ partitionToRecords.put(new TopicPartition(topic, i), records);
+ }
+ }
+
+ return new ConsumerRecords<>(partitionToRecords);
+ }
+
+ private void validateEmptyPartition(ConsumerRecord<Integer, String>
record, int emptyPartitionIndex) {
+ assertNotEquals(emptyPartitionIndex, record.partition(), "Partition "
+ record.partition() + " is not empty");
+ }
+
+ private void validateRecordPayload(String topic, ConsumerRecord<Integer,
String> record, int currentPartition, int recordCount, int recordSize) {
+ assertEquals(topic, record.topic());
+ assertEquals(currentPartition, record.partition());
+ assertEquals(recordCount % recordSize, record.offset());
+ assertEquals(recordCount % recordSize, record.key());
+ assertEquals(String.valueOf(recordCount % recordSize), record.value());
}
}