This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new d91256d  MINOR: Fix NPE from addingReplicas and removingReplicas 
(#10992)
d91256d is described below

commit d91256d989c56551cf34f1b64496e5e846628620
Author: David Arthur <[email protected]>
AuthorDate: Wed Jul 7 18:39:43 2021 -0400

    MINOR: Fix NPE from addingReplicas and removingReplicas (#10992)
    
    Fix NPE from addingReplicas and removingReplicas. Make addingReplicas and
    removingReplicas in PartitionRecord non-nullable as described in KIP-746.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../test/scala/unit/kafka/server/ReplicaManagerTest.scala  | 14 +++++++-------
 .../apache/kafka/controller/ReplicationControlManager.java |  8 ++++----
 .../main/resources/common/metadata/PartitionRecord.json    |  4 ++--
 .../org/apache/kafka/controller/QuorumControllerTest.java  |  8 ++++----
 .../kafka/controller/ReplicationControlManagerTest.java    |  4 ++--
 .../test/java/org/apache/kafka/image/TopicsImageTest.java  |  5 +++--
 .../apache/kafka/metadata/PartitionRegistrationTest.java   | 12 ++++++------
 7 files changed, 28 insertions(+), 27 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1c5d5db..97e70b1 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -45,6 +45,7 @@ import org.easymock.EasyMock
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.{ArgumentMatchers, Mockito}
+
 import java.io.File
 import java.net.InetAddress
 import java.nio.file.Files
@@ -52,11 +53,10 @@ import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.{Collections, Optional, Properties}
-
 import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, 
TopicRecord}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.image.{TopicImage, TopicsDelta, TopicsImage}
-import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 
@@ -2543,13 +2543,13 @@ class ReplicaManagerTest {
     val topicsByName = new util.HashMap[String, TopicImage]()
     val fooPartitions = new util.HashMap[Integer, PartitionRegistration]()
     fooPartitions.put(0, new PartitionRegistration(Array(1, 2, 3),
-      Array(1, 2, 3), Array.empty[Int], Array.empty[Int], 1, 100, 200))
+      Array(1, 2, 3), Replicas.NONE, Replicas.NONE, 1, 100, 200))
     fooPartitions.put(1, new PartitionRegistration(Array(4, 5, 6),
-      Array(4, 5), Array.empty[Int], Array.empty[Int], 5, 300, 400))
+      Array(4, 5), Replicas.NONE, Replicas.NONE, 5, 300, 400))
     val foo = new TopicImage("foo", FOO_UUID, fooPartitions)
     val barPartitions = new util.HashMap[Integer, PartitionRegistration]()
     barPartitions.put(0, new PartitionRegistration(Array(2, 3, 4),
-      Array(2, 3, 4), Array.empty[Int], Array.empty[Int], 3, 100, 200))
+      Array(2, 3, 4), Replicas.NONE, Replicas.NONE, 3, 100, 200))
     val bar = new TopicImage("bar", BAR_UUID, barPartitions)
     topicsById.put(FOO_UUID, foo)
     topicsByName.put("foo", foo)
@@ -2601,10 +2601,10 @@ class ReplicaManagerTest {
         new TopicPartition("foo", 1) -> true),
       Map(new TopicPartition("baz", 0) -> LocalLeaderInfo(BAZ_UUID,
         new PartitionRegistration(Array(1, 2, 4), Array(1, 2, 4),
-          Array.empty[Int], Array.empty[Int], 1, 123, 456))),
+          Replicas.NONE, Replicas.NONE, 1, 123, 456))),
       Map(new TopicPartition("baz", 1) -> LocalLeaderInfo(BAZ_UUID,
         new PartitionRegistration(Array(2, 4, 1), Array(2, 4, 1),
-          Array.empty[Int], Array.empty[Int], 2, 123, 456)))),
+          Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
     replicaManager.calculateDeltaChanges(TEST_DELTA))
   }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 79fc77a..db8d86a 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -378,7 +378,7 @@ public class ReplicationControlManager {
                 replicationFactor = 
OptionalInt.of(assignment.brokerIds().size());
                 int[] replicas = Replicas.toArray(assignment.brokerIds());
                 newParts.put(assignment.partitionIndex(), new 
PartitionRegistration(
-                    replicas, replicas, null, null, replicas[0], 0, 0));
+                    replicas, replicas, Replicas.NONE, Replicas.NONE, 
replicas[0], 0, 0));
             }
         } else if (topic.replicationFactor() < -1 || topic.replicationFactor() 
== 0) {
             return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
@@ -401,7 +401,7 @@ public class ReplicationControlManager {
                 for (int partitionId = 0; partitionId < replicas.size(); 
partitionId++) {
                     int[] r = Replicas.toArray(replicas.get(partitionId));
                     newParts.put(partitionId,
-                        new PartitionRegistration(r, r, null, null, r[0], 0, 
0));
+                        new PartitionRegistration(r, r, Replicas.NONE, 
Replicas.NONE, r[0], 0, 0));
                 }
             } catch (InvalidReplicationFactorException e) {
                 return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
@@ -928,8 +928,8 @@ public class ReplicationControlManager {
                 setTopicId(topicId).
                 setReplicas(placement).
                 setIsr(placement).
-                setRemovingReplicas(null).
-                setAddingReplicas(null).
+                setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()).
                 setLeader(placement.get(0)).
                 setLeaderEpoch(0).
                 setPartitionEpoch(0), 
PARTITION_RECORD.highestSupportedVersion()));
diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json 
b/metadata/src/main/resources/common/metadata/PartitionRecord.json
index fc5650d..233f852 100644
--- a/metadata/src/main/resources/common/metadata/PartitionRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json
@@ -28,9 +28,9 @@
       "about": "The replicas of this partition, sorted by preferred order." },
     { "name": "Isr", "type":  "[]int32", "versions":  "0+",
       "about": "The in-sync replicas of this partition" },
-    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", 
"nullableVersions": "0+", "entityType": "brokerId",
+    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", 
"entityType": "brokerId",
       "about": "The replicas that we are in the process of removing." },
-    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", 
"nullableVersions": "0+", "entityType": "brokerId",
+    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", 
"entityType": "brokerId",
       "about": "The replicas that we are in the process of adding." },
     { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", 
"entityType": "brokerId",
       "about": "The lead replica, or -1 if there is no leader." },
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 3d824c6..45c969d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -427,13 +427,13 @@ public class QuorumControllerTest {
                 setName("foo").setTopicId(fooId), (short) 1),
             new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
                 setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).
-                setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(null).
-                setAddingReplicas(null).setLeader(0).setLeaderEpoch(0).
+                setIsr(Arrays.asList(0, 1, 
2)).setRemovingReplicas(Collections.emptyList()).
+                
setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).
                 setPartitionEpoch(0), (short) 1),
             new ApiMessageAndVersion(new PartitionRecord().setPartitionId(1).
                 setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).
-                setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(null).
-                setAddingReplicas(null).setLeader(1).setLeaderEpoch(0).
+                setIsr(Arrays.asList(1, 2, 
0)).setRemovingReplicas(Collections.emptyList()).
+                
setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
                 setPartitionEpoch(0), (short) 1),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 5b40f6d..6298c95 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -185,7 +185,7 @@ public class ReplicationControlManagerTest {
         assertEquals(expectedResponse2, result2.response());
         ctx.replay(result2.records());
         assertEquals(new PartitionRegistration(new int[] {1, 2, 0},
-            new int[] {1, 2, 0}, null, null, 1, 0, 0),
+            new int[] {1, 2, 0}, Replicas.NONE, Replicas.NONE, 1, 0, 0),
             replicationControl.getPartition(
                 ((TopicRecord) result2.records().get(0).message()).topicId(), 
0));
         ControllerResult<CreateTopicsResponseData> result3 =
@@ -200,7 +200,7 @@ public class ReplicationControlManagerTest {
             Arrays.asList(new ApiMessageAndVersion(new PartitionRecord().
                     setPartitionId(0).setTopicId(fooId).
                     setReplicas(Arrays.asList(1, 2, 
0)).setIsr(Arrays.asList(1, 2, 0)).
-                    
setRemovingReplicas(null).setAddingReplicas(null).setLeader(1).
+                    
setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).
                     setLeaderEpoch(0).setPartitionEpoch(0), (short) 1),
                 new ApiMessageAndVersion(new TopicRecord().
                     setTopicId(fooId).setName("foo"), (short) 1))),
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 7f0320f..7cd2279 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -82,9 +83,9 @@ public class TopicsImageTest {
         List<TopicImage> topics1 = Arrays.asList(
             newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
             new PartitionRegistration(new int[] {2, 3, 4},
-                new int[] {2, 3}, new int[0], new int[0], 2, 1, 345),
+                new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345),
             new PartitionRegistration(new int[] {3, 4, 5},
-                new int[] {3, 4, 5}, new int[0], new int[0], 3, 4, 684)),
+                new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684)),
             newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
                 new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
                     new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 
0, 1, 345)));
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index 35b3ffc..787f032 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -47,11 +47,11 @@ public class PartitionRegistrationTest {
     @Test
     public void testPartitionControlInfoMergeAndDiff() {
         PartitionRegistration a = new PartitionRegistration(
-            new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{}, new int[]{}, 1, 
0, 0);
+            new int[]{1, 2, 3}, new int[]{1, 2}, Replicas.NONE, Replicas.NONE, 
1, 0, 0);
         PartitionRegistration b = new PartitionRegistration(
-            new int[]{1, 2, 3}, new int[]{3}, new int[]{}, new int[]{}, 3, 1, 
1);
+            new int[]{1, 2, 3}, new int[]{3}, Replicas.NONE, Replicas.NONE, 3, 
1, 1);
         PartitionRegistration c = new PartitionRegistration(
-            new int[]{1, 2, 3}, new int[]{1}, new int[]{}, new int[]{}, 1, 0, 
1);
+            new int[]{1, 2, 3}, new int[]{1}, Replicas.NONE, Replicas.NONE, 1, 
0, 1);
         assertEquals(b, a.merge(new PartitionChangeRecord().
             setLeader(3).setIsr(Arrays.asList(3))));
         assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1, 
partitionEpoch: 0 -> 1",
@@ -63,7 +63,7 @@ public class PartitionRegistrationTest {
     @Test
     public void testRecordRoundTrip() {
         PartitionRegistration registrationA = new PartitionRegistration(
-            new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{1}, new int[]{}, 1, 
0, 0);
+            new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{1}, Replicas.NONE, 
1, 0, 0);
         Uuid topicId = Uuid.fromString("OGdAI5nxT_m-ds3rJMqPLA");
         int partitionId = 4;
         ApiMessageAndVersion record = registrationA.toRecord(topicId, 
partitionId);
@@ -75,9 +75,9 @@ public class PartitionRegistrationTest {
     @Test
     public void testToLeaderAndIsrPartitionState() {
         PartitionRegistration a = new PartitionRegistration(
-            new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{}, new int[]{}, 1, 
123, 456);
+            new int[]{1, 2, 3}, new int[]{1, 2}, Replicas.NONE, Replicas.NONE, 
1, 123, 456);
         PartitionRegistration b = new PartitionRegistration(
-            new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 
2, 234, 567);
+            new int[]{2, 3, 4}, new int[]{2, 3, 4}, Replicas.NONE, 
Replicas.NONE, 2, 234, 567);
         assertEquals(new LeaderAndIsrPartitionState().
                 setTopicName("foo").
                 setPartitionIndex(1).

Reply via email to