This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dd6fcc650ed KAFKA-16901 Add unit tests for 
ConsumerRecords#records(String) (#16227)
dd6fcc650ed is described below

commit dd6fcc650ed56eb8483de7ab0ab06d8b6b0de2d1
Author: TingIāu "Ting" Kì <[email protected]>
AuthorDate: Thu Jun 13 14:35:33 2024 +0800

    KAFKA-16901 Add unit tests for ConsumerRecords#records(String) (#16227)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../clients/consumer/ConsumerRecordsTest.java      | 165 ++++++++++++++++++---
 1 file changed, 142 insertions(+), 23 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
index ea870c24781..2473fd9be28 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -31,32 +31,151 @@ import 
org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 public class ConsumerRecordsTest {
 
     @Test
-    public void iterator() throws Exception {
+    public void testIterator() {
+        String topic = "topic";
+        int recordSize = 10;
+        int partitionSize = 15;
+        int emptyPartitionIndex = 3;
+        ConsumerRecords<Integer, String> records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+        Iterator<ConsumerRecord<Integer, String>> iterator = 
records.iterator();
 
-        Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = 
new LinkedHashMap<>();
+        int recordCount = 0;
+        int partitionCount = 0;
+        int currentPartition = -1;
 
-        String topic = "topic";
-        records.put(new TopicPartition(topic, 0), new ArrayList<>());
-        ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-            0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-        ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-            0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-        records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-        records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-        ConsumerRecords<Integer, String> consumerRecords = new 
ConsumerRecords<>(records);
-        Iterator<ConsumerRecord<Integer, String>> iter = 
consumerRecords.iterator();
-
-        int c = 0;
-        for (; iter.hasNext(); c++) {
-            ConsumerRecord<Integer, String> record = iter.next();
-            assertEquals(1, record.partition());
-            assertEquals(topic, record.topic());
-            assertEquals(c, record.offset());
+        while (iterator.hasNext()) {
+            ConsumerRecord<Integer, String> record = iterator.next();
+            validateEmptyPartition(record, emptyPartitionIndex);
+
+            // Check if we have moved to a new partition
+            if (currentPartition != record.partition()) {
+                // Increment the partition count as we have encountered a new 
partition
+                partitionCount++;
+                // Update the current partition to the new partition
+                currentPartition = record.partition();
+            }
+
+            validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+            recordCount++;
+        }
+
+        // Including empty partition
+        assertEquals(partitionSize, partitionCount + 1);
+    }
+
+    @Test
+    public void testRecordsByPartition() {
+        List<String> topics = Arrays.asList("topic1", "topic2");
+        int recordSize = 3;
+        int partitionSize = 5;
+        int emptyPartitionIndex = 2;
+
+        ConsumerRecords<Integer, String> consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+        for (String topic : topics) {
+            for (int partition = 0; partition < partitionSize; partition++) {
+                TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+                List<ConsumerRecord<Integer, String>> records = 
consumerRecords.records(topicPartition);
+
+                if (partition == emptyPartitionIndex) {
+                    assertTrue(records.isEmpty());
+                } else {
+                    assertEquals(recordSize, records.size());
+                    for (int i = 0; i < records.size(); i++) {
+                        ConsumerRecord<Integer, String> record = 
records.get(i);
+                        validateRecordPayload(topic, record, partition, i, 
recordSize);
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testRecordsByNullTopic() {
+        String nullTopic = null;
+        ConsumerRecords<Integer, String> consumerRecords = 
ConsumerRecords.empty();
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+        assertEquals("Topic must be non-null.", exception.getMessage());
+    }
+
+
+    @Test
+    public void testRecordsByTopic() {
+        List<String> topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+        int recordSize = 3;
+        int partitionSize = 10;
+        int emptyPartitionIndex = 6;
+        int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+        ConsumerRecords<Integer, String> consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+        for (String topic : topics) {
+            Iterable<ConsumerRecord<Integer, String>> records = 
consumerRecords.records(topic);
+            int recordCount = 0;
+            int partitionCount = 0;
+            int currentPartition = -1;
+
+            for (ConsumerRecord<Integer, String> record : records) {
+                validateEmptyPartition(record, emptyPartitionIndex);
+
+                // Check if we have moved to a new partition
+                if (currentPartition != record.partition()) {
+                    // Increment the partition count as we have encountered a 
new partition
+                    partitionCount++;
+                    // Update the current partition to the new partition
+                    currentPartition = record.partition();
+                }
+
+                validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+                recordCount++;
+            }
+
+            // Including empty partition
+            assertEquals(partitionSize, partitionCount + 1);
+            assertEquals(expectedTotalRecordSizeOfEachTopic, recordCount);
         }
-        assertEquals(2, c);
+    }
+
+    private ConsumerRecords<Integer, String> buildTopicTestRecords(int 
recordSize,
+                                                                   int 
partitionSize,
+                                                                   int 
emptyPartitionIndex,
+                                                                   
Collection<String> topics) {
+        Map<TopicPartition, List<ConsumerRecord<Integer, String>>> 
partitionToRecords = new LinkedHashMap<>();
+        for (String topic : topics) {
+            for (int i = 0; i < partitionSize; i++) {
+                List<ConsumerRecord<Integer, String>> records = new 
ArrayList<>(recordSize);
+                if (i != emptyPartitionIndex) {
+                    for (int j = 0; j < recordSize; j++) {
+                        records.add(
+                            new ConsumerRecord<>(topic, i, j, 0L, 
TimestampType.CREATE_TIME,
+                                0, 0, j, String.valueOf(j), new 
RecordHeaders(), Optional.empty())
+                        );
+                    }
+                }
+                partitionToRecords.put(new TopicPartition(topic, i), records);
+            }
+        }
+
+        return new ConsumerRecords<>(partitionToRecords);
+    }
+
+    private void validateEmptyPartition(ConsumerRecord<Integer, String> 
record, int emptyPartitionIndex) {
+        assertNotEquals(emptyPartitionIndex, record.partition(), "Partition " 
+ record.partition() + " is not empty");
+    }
+
+    private void validateRecordPayload(String topic, ConsumerRecord<Integer, 
String> record, int currentPartition, int recordCount, int recordSize) {
+        assertEquals(topic, record.topic());
+        assertEquals(currentPartition, record.partition());
+        assertEquals(recordCount % recordSize, record.offset());
+        assertEquals(recordCount % recordSize, record.key());
+        assertEquals(String.valueOf(recordCount % recordSize), record.value());
     }
 }

Reply via email to