This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2d07e572877 KAFKA-15771: fix concurrency bug in
ProduceRequest#partitionSizes() (#14674)
2d07e572877 is described below
commit 2d07e572877933423349eb73995062101987e902
Author: Xiaobing Fang <[email protected]>
AuthorDate: Tue Nov 7 16:44:07 2023 +0800
KAFKA-15771: fix concurrency bug in ProduceRequest#partitionSizes()
(#14674)
A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this
method to incorrectly returning an empty or incomplete response for a thread
when another thread is in the process of initialising it.
Reviewers: Divij Vaidya <[email protected]>, hudeqi <[email protected]>,
vamossagar12 <[email protected]>
--------------------------------
Co-authored-by: fangxiaobing <[email protected]>
---
.../main/java/org/apache/kafka/common/requests/ProduceRequest.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 24a84de438d..4724ce4789c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -133,14 +133,15 @@ public class ProduceRequest extends AbstractRequest {
// this method may be called by different thread (see the comment
on data)
synchronized (this) {
if (partitionSizes == null) {
- partitionSizes = new HashMap<>();
+ Map<TopicPartition, Integer> tmpPartitionSizes = new
HashMap<>();
data.topicData().forEach(topicData ->
topicData.partitionData().forEach(partitionData ->
- partitionSizes.compute(new
TopicPartition(topicData.name(), partitionData.index()),
+ tmpPartitionSizes.compute(new
TopicPartition(topicData.name(), partitionData.index()),
(ignored, previousValue) ->
partitionData.records().sizeInBytes() +
(previousValue == null ? 0 : previousValue))
)
);
+ partitionSizes = tmpPartitionSizes;
}
}
}