This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 46d7e44d1b6 KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws 
NPE when one member has no subscriptions (#16283)
46d7e44d1b6 is described below

commit 46d7e44d1b6ed7807e5ec692f397a3d4118155b5
Author: David Jacot <[email protected]>
AuthorDate: Tue Jun 11 20:43:56 2024 +0200

    KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws NPE when one 
member has no subscriptions (#16283)
    
    Fix the following NPE:
    
    ```
    java.lang.NullPointerException: Cannot invoke 
"org.apache.kafka.coordinator.group.assignor.MemberAssignment.targetPartitions()"
 because the return value of "java.util.Map.get(Object)" is null
            at 
org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.canMemberParticipateInReassignment(GeneralUniformAssignmentBuilder.java:248)
            at 
org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.balance(GeneralUniformAssignmentBuilder.java:336)
            at 
org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.buildAssignment(GeneralUniformAssignmentBuilder.java:157)
            at 
org.apache.kafka.coordinator.group.assignor.UniformAssignor.assign(UniformAssignor.java:84)
            at 
org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.build(TargetAssignmentBuilder.java:302)
            at 
org.apache.kafka.coordinator.group.GroupMetadataManager.updateTargetAssignment(GroupMetadataManager.java:1913)
            at 
org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:1518)
            at 
org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:2254)
            at 
org.apache.kafka.coordinator.group.GroupCoordinatorShard.consumerGroupHeartbeat(GroupCoordinatorShard.java:308)
            at 
org.apache.kafka.coordinator.group.GroupCoordinatorService.lambda$consumerGroupHeartbeat$0(GroupCoordinatorService.java:298)
            at 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.lambda$run$0(CoordinatorRuntime.java:769)
            at 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.withActiveContextOrThrow(CoordinatorRuntime.java:1582)
            at 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.access$1400(CoordinatorRuntime.java:96)
            at 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.run(CoordinatorRuntime.java:767)
            at 
org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.handleEvents(MultiThreadedEventProcessor.java:144)
            at 
org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.run(MultiThreadedEventProcessor.java:176)
    ```
    
    Reviewers: Lianet Magrans <[email protected]>, Justine Olshan 
<[email protected]>
---
 .../UniformHeterogeneousAssignmentBuilder.java     |  8 ++--
 .../UniformHeterogeneousAssignmentBuilderTest.java | 45 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 4 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
index 682dbbd677d..a5e63d87805 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
@@ -119,7 +119,7 @@ public class UniformHeterogeneousAssignmentBuilder {
         this.subscribedTopicIds = new HashSet<>();
         this.membersPerTopic = new HashMap<>();
         this.targetAssignment = new HashMap<>();
-        groupSpec.memberIds().forEach(memberId ->
+        groupSpec.memberIds().forEach(memberId -> {
             
groupSpec.memberSubscription(memberId).subscribedTopicIds().forEach(topicId -> {
                 // Check if the subscribed topic exists.
                 int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
@@ -130,9 +130,9 @@ public class UniformHeterogeneousAssignmentBuilder {
                 }
                 subscribedTopicIds.add(topicId);
                 membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
-                targetAssignment.put(memberId, new MemberAssignmentImpl(new 
HashMap<>()));
-            })
-        );
+            });
+            targetAssignment.put(memberId, new MemberAssignmentImpl(new 
HashMap<>()));
+        });
         this.unassignedPartitions = topicIdPartitions(subscribedTopicIds, 
subscribedTopicDescriber);
         this.assignedStickyPartitions = new HashSet<>();
         this.assignmentManager = new 
AssignmentManager(this.subscribedTopicDescriber);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
index 4e48de28d48..183e80971d2 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
@@ -601,4 +601,49 @@ public class UniformHeterogeneousAssignmentBuilderTest {
 
         assertAssignment(expectedAssignment, computedAssignment);
     }
+
+    @Test
+    public void 
testFirstAssignmentWithTwoMembersIncludingOneWithoutSubscriptions() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+
+        Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
+
+        members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            mkSet(topic1Uuid),
+            Assignment.EMPTY
+        ));
+
+        members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Collections.emptySet(),
+            Assignment.EMPTY
+        ));
+
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            Collections.emptyMap()
+        );
+        SubscribedTopicDescriberImpl subscribedTopicMetadata = new 
SubscribedTopicDescriberImpl(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 1, 2)
+        ));
+        expectedAssignment.put(memberB, Collections.emptyMap());
+
+        assertAssignment(expectedAssignment, computedAssignment);
+    }
 }

Reply via email to