This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 20ee83c462f KAFKA-17102 FetchRequest#forgottenTopics would return
incorrect data (#16557)
20ee83c462f is described below
commit 20ee83c462f544d73518bd7ee05d21d971c56f24
Author: Ken Huang <[email protected]>
AuthorDate: Tue Jul 16 06:12:43 2024 +0900
KAFKA-17102 FetchRequest#forgottenTopics would return incorrect data
(#16557)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../apache/kafka/common/requests/FetchRequest.java | 30 +++++---------
.../kafka/common/requests/FetchRequestTest.java | 47 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 20 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 1082200ec39..2e2f8646bfb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -52,7 +52,6 @@ public class FetchRequest extends AbstractRequest {
private final FetchRequestData data;
private volatile LinkedHashMap<TopicIdPartition, PartitionData> fetchData
= null;
- private volatile List<TopicIdPartition> toForget = null;
// This is an immutable read-only structures derived from FetchRequestData
private final FetchMetadata metadata;
@@ -434,26 +433,17 @@ public class FetchRequest extends AbstractRequest {
// For versions < 13, builds the forgotten topics list using only the
FetchRequestData.
// For versions 13+, builds the forgotten topics list using both the
FetchRequestData and a mapping of topic IDs to names.
public List<TopicIdPartition> forgottenTopics(Map<Uuid, String>
topicNames) {
- if (toForget == null) {
- synchronized (this) {
- if (toForget == null) {
- // Assigning the lazy-initialized `toForget` in the last
step
- // to avoid other threads accessing a half-initialized
object.
- final List<TopicIdPartition> toForgetTmp = new
ArrayList<>();
- data.forgottenTopicsData().forEach(forgottenTopic -> {
- String name;
- if (version() < 13) {
- name = forgottenTopic.topic(); // can't be null
- } else {
- name = topicNames.get(forgottenTopic.topicId());
- }
- // Topic name may be null here if the topic name was
unable to be resolved using the topicNames map.
- forgottenTopic.partitions().forEach(partitionId ->
toForgetTmp.add(new TopicIdPartition(forgottenTopic.topicId(), new
TopicPartition(name, partitionId))));
- });
- toForget = toForgetTmp;
- }
+ final List<TopicIdPartition> toForget = new ArrayList<>();
+ data.forgottenTopicsData().forEach(forgottenTopic -> {
+ String name;
+ if (version() < 13) {
+ name = forgottenTopic.topic(); // can't be null
+ } else {
+ name = topicNames.get(forgottenTopic.topicId());
}
- }
+ // Topic name may be null here if the topic name was unable to be
resolved using the topicNames map.
+ forgottenTopic.partitions().forEach(partitionId ->
toForget.add(new TopicIdPartition(forgottenTopic.topicId(), new
TopicPartition(name, partitionId))));
+ });
return toForget;
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
index b05c9c94172..ebda210d774 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
@@ -243,4 +243,51 @@ public class FetchRequestTest {
new FetchRequest.PartitionData(Uuid.randomUuid(), 300, 0L, 300,
Optional.of(300)));
}
+ @ParameterizedTest
+ @MethodSource("fetchVersions")
+ public void testFetchRequestNoCacheData(short version) {
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 0;
+ TopicIdPartition tp = new TopicIdPartition(topicId, partition,
"topic");
+
+ FetchRequest fetchRequest = createFetchRequestByVersion(version,
topicId, tp);
+
+ Map<Uuid, String> topicNames = Collections.singletonMap(topicId,
tp.topic());
+ List<TopicIdPartition> requestsWithTopicsName =
fetchRequest.forgottenTopics(topicNames);
+ assertEquals(topicNames.size(), requestsWithTopicsName.size());
+ requestsWithTopicsName.forEach(request -> {
+ assertEquals(tp.topic(), request.topic());
+ assertEquals(topicId, request.topicId());
+ assertEquals(tp.partition(), request.partition());
+ assertEquals(tp.topicPartition(), request.topicPartition());
+ });
+
+ String expectedTopic = version >= 13 ? null : tp.topic();
+ List<TopicIdPartition> requestData =
fetchRequest.forgottenTopics(Collections.emptyMap());
+ assertEquals(1, requestData.size());
+ requestData.forEach(request -> {
+ assertEquals(expectedTopic, request.topic());
+ assertEquals(topicId, request.topicId());
+ assertEquals(tp.partition(), request.partition());
+ assertEquals(new TopicPartition(expectedTopic, partition),
request.topicPartition());
+ });
+
+ }
+
+ private FetchRequest createFetchRequestByVersion(short version, Uuid
topicId, TopicIdPartition tp) {
+ Map<TopicPartition, FetchRequest.PartitionData> partitionData =
Collections.singletonMap(tp.topicPartition(),
+ new FetchRequest.PartitionData(topicId, 0, 0, 0,
Optional.empty()));
+ if (version >= 13) {
+ return FetchRequest.Builder
+ .forReplica(version, 0, 1, 1, 1, partitionData)
+ .replaced(Collections.singletonList(tp))
+
.metadata(FetchMetadata.newIncremental(123)).build(version);
+ } else {
+ return FetchRequest.Builder
+ .forReplica(version, 0, 1, 1, 1, partitionData)
+ .removed(Collections.singletonList(tp))
+
.metadata(FetchMetadata.newIncremental(123)).build(version);
+ }
+ }
+
}