This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 064dd5f8fcd MINOR: Require non-null arguments to 
GroupCoordinatorService#onMetadataUpdate (#21052)
064dd5f8fcd is described below

commit 064dd5f8fcdf71681f5e54e5e0e5f7ff1f35b5e6
Author: David Jacot <[email protected]>
AuthorDate: Wed Dec 3 12:05:55 2025 +0100

    MINOR: Require non-null arguments to 
GroupCoordinatorService#onMetadataUpdate (#21052)
    
    We had some tests passing null values to
    `GroupCoordinatorService#onMetadataUpdate` even though it is not an
    expected case. This patch fixes those tests and ensure that only
    non-null values are accepted.
    
    Reviewers: Lianet Magrans <[email protected]>, Sean Quah
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../runtime/KRaftCoordinatorMetadataDelta.java     |  9 +++---
 .../runtime/KRaftCoordinatorMetadataImage.java     |  3 +-
 .../runtime/KRaftCoordinatorMetadataDeltaTest.java | 10 ++----
 .../coordinator/group/GroupCoordinatorService.java |  7 +++--
 .../group/GroupCoordinatorServiceTest.java         | 36 ++++++++++------------
 5 files changed, 30 insertions(+), 35 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDelta.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDelta.java
index 8e340d81c88..fe557399645 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDelta.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDelta.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.image.MetadataDelta;
 
 import java.util.Collection;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -30,12 +31,12 @@ public class KRaftCoordinatorMetadataDelta implements 
CoordinatorMetadataDelta {
     final MetadataDelta metadataDelta;
 
     public KRaftCoordinatorMetadataDelta(MetadataDelta metadataDelta) {
-        this.metadataDelta = metadataDelta;
+        this.metadataDelta = Objects.requireNonNull(metadataDelta, 
"metadataDelta must be provided");
     }
 
     @Override
     public Collection<Uuid> createdTopicIds() {
-        if (metadataDelta == null || metadataDelta.topicsDelta() == null) {
+        if (metadataDelta.topicsDelta() == null) {
             return Set.of();
         }
         return metadataDelta.topicsDelta().createdTopicIds();
@@ -43,7 +44,7 @@ public class KRaftCoordinatorMetadataDelta implements 
CoordinatorMetadataDelta {
 
     @Override
     public Collection<Uuid> changedTopicIds() {
-        if (metadataDelta == null || metadataDelta.topicsDelta() == null) {
+        if (metadataDelta.topicsDelta() == null) {
             return Set.of();
         }
         return metadataDelta.topicsDelta().changedTopics().keySet();
@@ -51,7 +52,7 @@ public class KRaftCoordinatorMetadataDelta implements 
CoordinatorMetadataDelta {
 
     @Override
     public Set<Uuid> deletedTopicIds() {
-        if (metadataDelta == null || metadataDelta.topicsDelta() == null) {
+        if (metadataDelta.topicsDelta() == null) {
             return Set.of();
         }
         return metadataDelta.topicsDelta().deletedTopicIds();
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
index c0284a4aed6..8df13da1384 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
@@ -27,6 +27,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
@@ -38,7 +39,7 @@ public class KRaftCoordinatorMetadataImage implements 
CoordinatorMetadataImage {
     private final MetadataImage metadataImage;
 
     public KRaftCoordinatorMetadataImage(MetadataImage metadataImage) {
-        this.metadataImage = metadataImage;
+        this.metadataImage = Objects.requireNonNull(metadataImage, 
"metadataImage must be provided");
     }
 
     @Override
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
index f65103e87d8..e72170dc0f8 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
@@ -32,20 +32,14 @@ import java.util.Set;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KRaftCoordinatorMetadataDeltaTest {
 
     @Test
     public void testKRaftCoordinatorDeltaWithNulls() {
-        assertTrue(new 
KRaftCoordinatorMetadataDelta(null).changedTopicIds().isEmpty());
-        assertTrue(new KRaftCoordinatorMetadataDelta(new 
MetadataDelta(MetadataImage.EMPTY)).changedTopicIds().isEmpty());
-
-        assertTrue(new 
KRaftCoordinatorMetadataDelta(null).deletedTopicIds().isEmpty());
-        assertTrue(new KRaftCoordinatorMetadataDelta(new 
MetadataDelta(MetadataImage.EMPTY)).deletedTopicIds().isEmpty());
-
-        assertTrue(new 
KRaftCoordinatorMetadataDelta(null).createdTopicIds().isEmpty());
-        assertTrue(new KRaftCoordinatorMetadataDelta(new 
MetadataDelta(MetadataImage.EMPTY)).createdTopicIds().isEmpty());
+        assertThrows(NullPointerException.class, () -> new 
KRaftCoordinatorMetadataDelta(null));
     }
 
     @Test
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 5738abc5205..9c29183ca38 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -129,6 +129,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Properties;
@@ -2311,8 +2312,10 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         MetadataImage newImage
     ) {
         throwIfNotActive();
-        var wrappedImage = newImage == null ? null : new 
KRaftCoordinatorMetadataImage(newImage);
-        var wrappedDelta = delta == null ? null : new 
KRaftCoordinatorMetadataDelta(delta);
+        Objects.requireNonNull(delta, "delta must be provided");
+        Objects.requireNonNull(newImage, "newImage must be provided");
+        var wrappedImage = new KRaftCoordinatorMetadataImage(newImage);
+        var wrappedDelta = new KRaftCoordinatorMetadataDelta(delta);
         metadataImage = wrappedImage;
         runtime.onMetadataUpdate(wrappedDelta, wrappedImage);
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 515e55d5eee..df911666295 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -4226,10 +4226,9 @@ public class GroupCoordinatorServiceTest {
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
-            .build(true);
+            .build(false);
 
-        // Forcing a null Metadata Image
-        service.onMetadataUpdate(null, null);
+        service.startup(() -> 1);
 
         int partition = 1;
         
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
@@ -4273,7 +4272,7 @@ public class GroupCoordinatorServiceTest {
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
             .build();
 
-        service.onMetadataUpdate(null, image);
+        service.onMetadataUpdate(new MetadataDelta(image), image);
 
         int partition = 1;
 
@@ -4344,7 +4343,7 @@ public class GroupCoordinatorServiceTest {
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
             .build();
 
-        service.onMetadataUpdate(null, image);
+        service.onMetadataUpdate(new MetadataDelta(image), image);
 
         int partition = 1;
 
@@ -4380,7 +4379,7 @@ public class GroupCoordinatorServiceTest {
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
             .build();
 
-        service.onMetadataUpdate(null, image);
+        service.onMetadataUpdate(new MetadataDelta(image), image);
 
         int partition = 1;
 
@@ -4417,7 +4416,7 @@ public class GroupCoordinatorServiceTest {
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
             .build();
 
-        service.onMetadataUpdate(null, image);
+        service.onMetadataUpdate(new MetadataDelta(image), image);
 
         int partition = 1;
 
@@ -4504,10 +4503,9 @@ public class GroupCoordinatorServiceTest {
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
-            .build(true);
+            .build(false);
 
-        // Forcing a null Metadata Image
-        service.onMetadataUpdate(null, null);
+        service.startup(() -> 1);
 
         
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
             .setGroupId("share-group-id")
@@ -4701,10 +4699,9 @@ public class GroupCoordinatorServiceTest {
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
-            .build(true);
+            .build(false);
 
-        // Forcing a null Metadata Image
-        service.onMetadataUpdate(null, null);
+        service.startup(() -> 1);
 
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
             .setGroupId("share-group-id")
@@ -5581,7 +5578,7 @@ public class GroupCoordinatorServiceTest {
             .addTopic(topicId, "topic-name", 3)
             .build();
 
-        service.onMetadataUpdate(null, image);
+        service.onMetadataUpdate(new MetadataDelta(image), image);
 
         
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
             new InitializeShareGroupStateResult.Builder()
@@ -5756,7 +5753,7 @@ public class GroupCoordinatorServiceTest {
             .addTopic(topicId, "topic-name", 3)
             .build();
 
-        service.onMetadataUpdate(null, image);
+        service.onMetadataUpdate(new MetadataDelta(image), image);
 
         
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
             new InitializeShareGroupStateResult.Builder()
@@ -5815,10 +5812,9 @@ public class GroupCoordinatorServiceTest {
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
-            .build(true);
+            .build(false);
 
-        // Forcing a null Metadata Image
-        service.onMetadataUpdate(null, null);
+        service.startup(() -> 1);
 
         String groupId = "share-group";
         AlterShareGroupOffsetsRequestData request = new 
AlterShareGroupOffsetsRequestData()
@@ -5974,7 +5970,7 @@ public class GroupCoordinatorServiceTest {
             .addTopic(topicId, "topic-name", 1)
             .build();
 
-        service.onMetadataUpdate(null, image);
+        service.onMetadataUpdate(new MetadataDelta(image), image);
 
         
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
             new InitializeShareGroupStateResult.Builder()
@@ -6050,7 +6046,7 @@ public class GroupCoordinatorServiceTest {
 
             if (serviceStartup) {
                 service.startup(() -> 1);
-                service.onMetadataUpdate(null, metadataImage);
+                service.onMetadataUpdate(new MetadataDelta(metadataImage), 
metadataImage);
             }
 
             return service;

Reply via email to