This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2d07e572877  KAFKA-15771: fix concurrency bug in 
ProduceRequest#partitionSizes() (#14674)
2d07e572877 is described below

commit 2d07e572877933423349eb73995062101987e902
Author: Xiaobing Fang <[email protected]>
AuthorDate: Tue Nov 7 16:44:07 2023 +0800

     KAFKA-15771: fix concurrency bug in ProduceRequest#partitionSizes() 
(#14674)
    
    A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this 
method to incorrectly returning an empty or incomplete response for a thread 
when another thread is in the process of initialising it.
    
    Reviewers: Divij Vaidya <[email protected]>, hudeqi <[email protected]>, 
vamossagar12 <[email protected]>
    
    --------------------------------
    Co-authored-by: fangxiaobing <[email protected]>
---
 .../main/java/org/apache/kafka/common/requests/ProduceRequest.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 24a84de438d..4724ce4789c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -133,14 +133,15 @@ public class ProduceRequest extends AbstractRequest {
             // this method may be called by different thread (see the comment 
on data)
             synchronized (this) {
                 if (partitionSizes == null) {
-                    partitionSizes = new HashMap<>();
+                    Map<TopicPartition, Integer> tmpPartitionSizes = new 
HashMap<>();
                     data.topicData().forEach(topicData ->
                         topicData.partitionData().forEach(partitionData ->
-                            partitionSizes.compute(new 
TopicPartition(topicData.name(), partitionData.index()),
+                            tmpPartitionSizes.compute(new 
TopicPartition(topicData.name(), partitionData.index()),
                                 (ignored, previousValue) ->
                                     partitionData.records().sizeInBytes() + 
(previousValue == null ? 0 : previousValue))
                         )
                     );
+                    partitionSizes = tmpPartitionSizes;
                 }
             }
         }

Reply via email to