This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 20ee83c462f KAFKA-17102 FetchRequest#forgottenTopics would return 
incorrect data (#16557)
20ee83c462f is described below

commit 20ee83c462f544d73518bd7ee05d21d971c56f24
Author: Ken Huang <[email protected]>
AuthorDate: Tue Jul 16 06:12:43 2024 +0900

    KAFKA-17102 FetchRequest#forgottenTopics would return incorrect data 
(#16557)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/common/requests/FetchRequest.java | 30 +++++---------
 .../kafka/common/requests/FetchRequestTest.java    | 47 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 20 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 1082200ec39..2e2f8646bfb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -52,7 +52,6 @@ public class FetchRequest extends AbstractRequest {
 
     private final FetchRequestData data;
     private volatile LinkedHashMap<TopicIdPartition, PartitionData> fetchData 
= null;
-    private volatile List<TopicIdPartition> toForget = null;
 
     // This is an immutable read-only structures derived from FetchRequestData
     private final FetchMetadata metadata;
@@ -434,26 +433,17 @@ public class FetchRequest extends AbstractRequest {
     // For versions < 13, builds the forgotten topics list using only the 
FetchRequestData.
     // For versions 13+, builds the forgotten topics list using both the 
FetchRequestData and a mapping of topic IDs to names.
     public List<TopicIdPartition> forgottenTopics(Map<Uuid, String> 
topicNames) {
-        if (toForget == null) {
-            synchronized (this) {
-                if (toForget == null) {
-                    // Assigning the lazy-initialized `toForget` in the last 
step
-                    // to avoid other threads accessing a half-initialized 
object.
-                    final List<TopicIdPartition> toForgetTmp = new 
ArrayList<>();
-                    data.forgottenTopicsData().forEach(forgottenTopic -> {
-                        String name;
-                        if (version() < 13) {
-                            name = forgottenTopic.topic(); // can't be null
-                        } else {
-                            name = topicNames.get(forgottenTopic.topicId());
-                        }
-                        // Topic name may be null here if the topic name was 
unable to be resolved using the topicNames map.
-                        forgottenTopic.partitions().forEach(partitionId -> 
toForgetTmp.add(new TopicIdPartition(forgottenTopic.topicId(), new 
TopicPartition(name, partitionId))));
-                    });
-                    toForget = toForgetTmp;
-                }
+        final List<TopicIdPartition> toForget = new ArrayList<>();
+        data.forgottenTopicsData().forEach(forgottenTopic -> {
+            String name;
+            if (version() < 13) {
+                name = forgottenTopic.topic(); // can't be null
+            } else {
+                name = topicNames.get(forgottenTopic.topicId());
             }
-        }
+            // Topic name may be null here if the topic name was unable to be 
resolved using the topicNames map.
+            forgottenTopic.partitions().forEach(partitionId -> 
toForget.add(new TopicIdPartition(forgottenTopic.topicId(), new 
TopicPartition(name, partitionId))));
+        });
         return toForget;
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java 
b/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
index b05c9c94172..ebda210d774 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java
@@ -243,4 +243,51 @@ public class FetchRequestTest {
             new FetchRequest.PartitionData(Uuid.randomUuid(), 300, 0L, 300, 
Optional.of(300)));
     }
 
+    @ParameterizedTest
+    @MethodSource("fetchVersions")
+    public void testFetchRequestNoCacheData(short version) {
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 0;
+        TopicIdPartition tp = new TopicIdPartition(topicId, partition, 
"topic");
+        
+        FetchRequest fetchRequest = createFetchRequestByVersion(version, 
topicId, tp);
+        
+        Map<Uuid, String> topicNames = Collections.singletonMap(topicId, 
tp.topic());
+        List<TopicIdPartition> requestsWithTopicsName = 
fetchRequest.forgottenTopics(topicNames);
+        assertEquals(topicNames.size(), requestsWithTopicsName.size());
+        requestsWithTopicsName.forEach(request -> {
+            assertEquals(tp.topic(), request.topic());
+            assertEquals(topicId, request.topicId());
+            assertEquals(tp.partition(), request.partition());
+            assertEquals(tp.topicPartition(), request.topicPartition());
+        });
+
+        String expectedTopic = version >= 13 ? null : tp.topic();
+        List<TopicIdPartition> requestData = 
fetchRequest.forgottenTopics(Collections.emptyMap());
+        assertEquals(1, requestData.size());
+        requestData.forEach(request -> {
+            assertEquals(expectedTopic, request.topic());
+            assertEquals(topicId, request.topicId());
+            assertEquals(tp.partition(), request.partition());
+            assertEquals(new TopicPartition(expectedTopic, partition), 
request.topicPartition());
+        });
+
+    }
+
+    private FetchRequest createFetchRequestByVersion(short version, Uuid 
topicId, TopicIdPartition tp) {
+        Map<TopicPartition, FetchRequest.PartitionData> partitionData = 
Collections.singletonMap(tp.topicPartition(),
+                new FetchRequest.PartitionData(topicId, 0, 0, 0, 
Optional.empty()));
+        if (version >= 13) {
+            return FetchRequest.Builder
+                    .forReplica(version, 0, 1, 1, 1, partitionData)
+                    .replaced(Collections.singletonList(tp))
+                    
.metadata(FetchMetadata.newIncremental(123)).build(version);
+        } else {
+            return FetchRequest.Builder
+                    .forReplica(version, 0, 1, 1, 1, partitionData)
+                    .removed(Collections.singletonList(tp))
+                    
.metadata(FetchMetadata.newIncremental(123)).build(version);
+        }
+    }
+
 }

Reply via email to