This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 481cc13a13 KAFKA-13791: Fix potential race condition in 
FetchResponse#`fetchData` and `forgottenTopics` (#11981)
481cc13a13 is described below

commit 481cc13a132d33f23e737f88ae28a1aac135afed
Author: yun-yun <[email protected]>
AuthorDate: Tue Apr 5 15:27:32 2022 +0800

    KAFKA-13791: Fix potential race condition in FetchResponse#`fetchData` and 
`forgottenTopics` (#11981)
    
    Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of 
lazy-initialized members should be the last step with double-checked locking
    
    Reviewers: Luke Chen <[email protected]>
---
 .../org/apache/kafka/common/requests/FetchRequest.java   | 16 +++++++++++-----
 .../org/apache/kafka/common/requests/FetchResponse.java  |  2 +-
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 48ba022610..09242bfc4b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -351,8 +351,10 @@ public class FetchRequest extends AbstractRequest {
         if (fetchData == null) {
             synchronized (this) {
                 if (fetchData == null) {
-                    fetchData = new LinkedHashMap<>();
-                    short version = version();
+                    // Assigning the lazy-initialized `fetchData` in the last 
step
+                    // to avoid other threads accessing a half-initialized 
object.
+                    final LinkedHashMap<TopicIdPartition, PartitionData> 
fetchDataTmp = new LinkedHashMap<>();
+                    final short version = version();
                     data.topics().forEach(fetchTopic -> {
                         String name;
                         if (version < 13) {
@@ -362,7 +364,7 @@ public class FetchRequest extends AbstractRequest {
                         }
                         fetchTopic.partitions().forEach(fetchPartition ->
                                 // Topic name may be null here if the topic 
name was unable to be resolved using the topicNames map.
-                                fetchData.put(new 
TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, 
fetchPartition.partition())),
+                                fetchDataTmp.put(new 
TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, 
fetchPartition.partition())),
                                         new PartitionData(
                                                 fetchTopic.topicId(),
                                                 fetchPartition.fetchOffset(),
@@ -374,6 +376,7 @@ public class FetchRequest extends AbstractRequest {
                                 )
                         );
                     });
+                    fetchData = fetchDataTmp;
                 }
             }
         }
@@ -386,7 +389,9 @@ public class FetchRequest extends AbstractRequest {
         if (toForget == null) {
             synchronized (this) {
                 if (toForget == null) {
-                    toForget = new ArrayList<>();
+                    // Assigning the lazy-initialized `toForget` in the last 
step
+                    // to avoid other threads accessing a half-initialized 
object.
+                    final List<TopicIdPartition> toForgetTmp = new 
ArrayList<>();
                     data.forgottenTopicsData().forEach(forgottenTopic -> {
                         String name;
                         if (version() < 13) {
@@ -395,8 +400,9 @@ public class FetchRequest extends AbstractRequest {
                             name = topicNames.get(forgottenTopic.topicId());
                         }
                         // Topic name may be null here if the topic name was 
unable to be resolved using the topicNames map.
-                        forgottenTopic.partitions().forEach(partitionId -> 
toForget.add(new TopicIdPartition(forgottenTopic.topicId(), new 
TopicPartition(name, partitionId))));
+                        forgottenTopic.partitions().forEach(partitionId -> 
toForgetTmp.add(new TopicIdPartition(forgottenTopic.topicId(), new 
TopicPartition(name, partitionId))));
                     });
+                    toForget = toForgetTmp;
                 }
             }
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 0d7049d755..a4af4ca2a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -100,7 +100,7 @@ public class FetchResponse extends AbstractResponse {
         if (responseData == null) {
             synchronized (this) {
                 if (responseData == null) {
-                    // Assigning the lazy-initialized responseData in the last 
step
+                    // Assigning the lazy-initialized `responseData` in the 
last step
                     // to avoid other threads accessing a half-initialized 
object.
                     final LinkedHashMap<TopicPartition, 
FetchResponseData.PartitionData> responseDataTmp =
                             new LinkedHashMap<>();

Reply via email to