This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 43235c27965 KAFKA-17133 add unit test to make sure
`ConsumerRecords#recoreds` returns immutable object (#16588)
43235c27965 is described below
commit 43235c2796531987282bdf81eac2cc9c3cc32fde
Author: Demonic <[email protected]>
AuthorDate: Tue Jul 16 17:05:13 2024 +0800
KAFKA-17133 add unit test to make sure `ConsumerRecords#recoreds` returns
immutable object (#16588)
Reviewers: TingIāu "Ting" Kì <[email protected]>, Ken Huang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../clients/consumer/ConsumerRecordsTest.java | 30 +++++++++++++++++++++-
1 file changed, 29 insertions(+), 1 deletion(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
index 78f6754d87c..c0678eb4395 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
@@ -107,7 +107,6 @@ public class ConsumerRecordsTest {
assertEquals("Topic must be non-null.", exception.getMessage());
}
-
@Test
public void testRecordsByTopic() {
List<String> topics = Arrays.asList("topic1", "topic2", "topic3",
"topic4");
@@ -145,6 +144,35 @@ public class ConsumerRecordsTest {
}
}
+ @Test
+ public void testRecordsAreImmutable() {
+ String topic = "topic";
+ int recordSize = 3;
+ int partitionSize = 6;
+ int emptyPartitionIndex = 2;
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+ ConsumerRecord<Integer, String> newRecord = new
ConsumerRecord<>(topic, 0, 0, 0L, TimestampType.CREATE_TIME,
+ 0, 0, 0, "0", new RecordHeaders(), Optional.empty());
+ ConsumerRecords<Integer, String> records =
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex,
Collections.singleton(topic));
+ ConsumerRecords<Integer, String> emptyRecords =
ConsumerRecords.empty();
+
+ // check records(TopicPartition) / partitions by add method
+ // check iterator / records(String) by remove method
+ // check data count after all operations
+ assertThrows(UnsupportedOperationException.class, () ->
records.records(topicPartition).add(newRecord));
+ assertThrows(UnsupportedOperationException.class, () ->
records.partitions().add(topicPartition));
+ assertThrows(UnsupportedOperationException.class, () ->
records.iterator().remove());
+ assertThrows(UnsupportedOperationException.class, () ->
records.records(topic).iterator().remove());
+ assertEquals(recordSize * (partitionSize - 1), records.count());
+
+ // do the same unittest on the empty records
+ assertThrows(UnsupportedOperationException.class, () ->
emptyRecords.records(topicPartition).add(newRecord));
+ assertThrows(UnsupportedOperationException.class, () ->
emptyRecords.partitions().add(topicPartition));
+ assertThrows(UnsupportedOperationException.class, () ->
emptyRecords.iterator().remove());
+ assertThrows(UnsupportedOperationException.class, () ->
emptyRecords.records(topic).iterator().remove());
+ assertEquals(0, emptyRecords.count());
+ }
+
private ConsumerRecords<Integer, String> buildTopicTestRecords(int
recordSize,
int
partitionSize,
int
emptyPartitionIndex,