This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 481cc13a13 KAFKA-13791: Fix potential race condition in
FetchResponse#`fetchData` and `forgottenTopics` (#11981)
481cc13a13 is described below
commit 481cc13a132d33f23e737f88ae28a1aac135afed
Author: yun-yun <[email protected]>
AuthorDate: Tue Apr 5 15:27:32 2022 +0800
KAFKA-13791: Fix potential race condition in FetchResponse#`fetchData` and
`forgottenTopics` (#11981)
Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of
lazy-initialized members should be the last step with double-checked locking
Reviewers: Luke Chen <[email protected]>
---
.../org/apache/kafka/common/requests/FetchRequest.java | 16 +++++++++++-----
.../org/apache/kafka/common/requests/FetchResponse.java | 2 +-
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 48ba022610..09242bfc4b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -351,8 +351,10 @@ public class FetchRequest extends AbstractRequest {
if (fetchData == null) {
synchronized (this) {
if (fetchData == null) {
- fetchData = new LinkedHashMap<>();
- short version = version();
+ // Assigning the lazy-initialized `fetchData` in the last
step
+ // to avoid other threads accessing a half-initialized
object.
+ final LinkedHashMap<TopicIdPartition, PartitionData>
fetchDataTmp = new LinkedHashMap<>();
+ final short version = version();
data.topics().forEach(fetchTopic -> {
String name;
if (version < 13) {
@@ -362,7 +364,7 @@ public class FetchRequest extends AbstractRequest {
}
fetchTopic.partitions().forEach(fetchPartition ->
// Topic name may be null here if the topic
name was unable to be resolved using the topicNames map.
- fetchData.put(new
TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name,
fetchPartition.partition())),
+ fetchDataTmp.put(new
TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name,
fetchPartition.partition())),
new PartitionData(
fetchTopic.topicId(),
fetchPartition.fetchOffset(),
@@ -374,6 +376,7 @@ public class FetchRequest extends AbstractRequest {
)
);
});
+ fetchData = fetchDataTmp;
}
}
}
@@ -386,7 +389,9 @@ public class FetchRequest extends AbstractRequest {
if (toForget == null) {
synchronized (this) {
if (toForget == null) {
- toForget = new ArrayList<>();
+ // Assigning the lazy-initialized `toForget` in the last
step
+ // to avoid other threads accessing a half-initialized
object.
+ final List<TopicIdPartition> toForgetTmp = new
ArrayList<>();
data.forgottenTopicsData().forEach(forgottenTopic -> {
String name;
if (version() < 13) {
@@ -395,8 +400,9 @@ public class FetchRequest extends AbstractRequest {
name = topicNames.get(forgottenTopic.topicId());
}
// Topic name may be null here if the topic name was
unable to be resolved using the topicNames map.
- forgottenTopic.partitions().forEach(partitionId ->
toForget.add(new TopicIdPartition(forgottenTopic.topicId(), new
TopicPartition(name, partitionId))));
+ forgottenTopic.partitions().forEach(partitionId ->
toForgetTmp.add(new TopicIdPartition(forgottenTopic.topicId(), new
TopicPartition(name, partitionId))));
});
+ toForget = toForgetTmp;
}
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 0d7049d755..a4af4ca2a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -100,7 +100,7 @@ public class FetchResponse extends AbstractResponse {
if (responseData == null) {
synchronized (this) {
if (responseData == null) {
- // Assigning the lazy-initialized responseData in the last
step
+ // Assigning the lazy-initialized `responseData` in the
last step
// to avoid other threads accessing a half-initialized
object.
final LinkedHashMap<TopicPartition,
FetchResponseData.PartitionData> responseDataTmp =
new LinkedHashMap<>();