This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 46d7e44d1b6 KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws
NPE when one member has no subscriptions (#16283)
46d7e44d1b6 is described below
commit 46d7e44d1b6ed7807e5ec692f397a3d4118155b5
Author: David Jacot <[email protected]>
AuthorDate: Tue Jun 11 20:43:56 2024 +0200
KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws NPE when one
member has no subscriptions (#16283)
Fix the following NPE:
```
java.lang.NullPointerException: Cannot invoke
"org.apache.kafka.coordinator.group.assignor.MemberAssignment.targetPartitions()"
because the return value of "java.util.Map.get(Object)" is null
at
org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.canMemberParticipateInReassignment(GeneralUniformAssignmentBuilder.java:248)
at
org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.balance(GeneralUniformAssignmentBuilder.java:336)
at
org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.buildAssignment(GeneralUniformAssignmentBuilder.java:157)
at
org.apache.kafka.coordinator.group.assignor.UniformAssignor.assign(UniformAssignor.java:84)
at
org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.build(TargetAssignmentBuilder.java:302)
at
org.apache.kafka.coordinator.group.GroupMetadataManager.updateTargetAssignment(GroupMetadataManager.java:1913)
at
org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:1518)
at
org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:2254)
at
org.apache.kafka.coordinator.group.GroupCoordinatorShard.consumerGroupHeartbeat(GroupCoordinatorShard.java:308)
at
org.apache.kafka.coordinator.group.GroupCoordinatorService.lambda$consumerGroupHeartbeat$0(GroupCoordinatorService.java:298)
at
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.lambda$run$0(CoordinatorRuntime.java:769)
at
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.withActiveContextOrThrow(CoordinatorRuntime.java:1582)
at
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.access$1400(CoordinatorRuntime.java:96)
at
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.run(CoordinatorRuntime.java:767)
at
org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.handleEvents(MultiThreadedEventProcessor.java:144)
at
org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.run(MultiThreadedEventProcessor.java:176)
```
Reviewers: Lianet Magrans <[email protected]>, Justine Olshan
<[email protected]>
---
.../UniformHeterogeneousAssignmentBuilder.java | 8 ++--
.../UniformHeterogeneousAssignmentBuilderTest.java | 45 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 4 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
index 682dbbd677d..a5e63d87805 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
@@ -119,7 +119,7 @@ public class UniformHeterogeneousAssignmentBuilder {
this.subscribedTopicIds = new HashSet<>();
this.membersPerTopic = new HashMap<>();
this.targetAssignment = new HashMap<>();
- groupSpec.memberIds().forEach(memberId ->
+ groupSpec.memberIds().forEach(memberId -> {
groupSpec.memberSubscription(memberId).subscribedTopicIds().forEach(topicId -> {
// Check if the subscribed topic exists.
int partitionCount =
subscribedTopicDescriber.numPartitions(topicId);
@@ -130,9 +130,9 @@ public class UniformHeterogeneousAssignmentBuilder {
}
subscribedTopicIds.add(topicId);
membersPerTopic.computeIfAbsent(topicId, k -> new
ArrayList<>()).add(memberId);
- targetAssignment.put(memberId, new MemberAssignmentImpl(new
HashMap<>()));
- })
- );
+ });
+ targetAssignment.put(memberId, new MemberAssignmentImpl(new
HashMap<>()));
+ });
this.unassignedPartitions = topicIdPartitions(subscribedTopicIds,
subscribedTopicDescriber);
this.assignedStickyPartitions = new HashSet<>();
this.assignmentManager = new
AssignmentManager(this.subscribedTopicDescriber);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
index 4e48de28d48..183e80971d2 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
@@ -601,4 +601,49 @@ public class UniformHeterogeneousAssignmentBuilderTest {
assertAssignment(expectedAssignment, computedAssignment);
}
+
+ @Test
+ public void
testFirstAssignmentWithTwoMembersIncludingOneWithoutSubscriptions() {
+ Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+ topicMetadata.put(topic1Uuid, new TopicMetadata(
+ topic1Uuid,
+ topic1Name,
+ 3,
+ mkMapOfPartitionRacks(3)
+ ));
+
+ Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
+
+ members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ mkSet(topic1Uuid),
+ Assignment.EMPTY
+ ));
+
+ members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Collections.emptySet(),
+ Assignment.EMPTY
+ ));
+
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ Collections.emptyMap()
+ );
+ SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
+
+ Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new
HashMap<>();
+ expectedAssignment.put(memberA, mkAssignment(
+ mkTopicAssignment(topic1Uuid, 0, 1, 2)
+ ));
+ expectedAssignment.put(memberB, Collections.emptyMap());
+
+ assertAssignment(expectedAssignment, computedAssignment);
+ }
}