This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new d91256d MINOR: Fix NPE from addingReplicas and removingReplicas
(#10992)
d91256d is described below
commit d91256d989c56551cf34f1b64496e5e846628620
Author: David Arthur <[email protected]>
AuthorDate: Wed Jul 7 18:39:43 2021 -0400
MINOR: Fix NPE from addingReplicas and removingReplicas (#10992)
Fix NPE from addingReplicas and removingReplicas. Make addingReplicas and
removingReplicas in PartitionRecord non-nullable as described in KIP-746.
Reviewers: Colin P. McCabe <[email protected]>
---
.../test/scala/unit/kafka/server/ReplicaManagerTest.scala | 14 +++++++-------
.../apache/kafka/controller/ReplicationControlManager.java | 8 ++++----
.../main/resources/common/metadata/PartitionRecord.json | 4 ++--
.../org/apache/kafka/controller/QuorumControllerTest.java | 8 ++++----
.../kafka/controller/ReplicationControlManagerTest.java | 4 ++--
.../test/java/org/apache/kafka/image/TopicsImageTest.java | 5 +++--
.../apache/kafka/metadata/PartitionRegistrationTest.java | 12 ++++++------
7 files changed, 28 insertions(+), 27 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1c5d5db..97e70b1 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -45,6 +45,7 @@ import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.{ArgumentMatchers, Mockito}
+
import java.io.File
import java.net.InetAddress
import java.nio.file.Files
@@ -52,11 +53,10 @@ import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Collections, Optional, Properties}
-
import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord,
TopicRecord}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.image.{TopicImage, TopicsDelta, TopicsImage}
-import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@@ -2543,13 +2543,13 @@ class ReplicaManagerTest {
val topicsByName = new util.HashMap[String, TopicImage]()
val fooPartitions = new util.HashMap[Integer, PartitionRegistration]()
fooPartitions.put(0, new PartitionRegistration(Array(1, 2, 3),
- Array(1, 2, 3), Array.empty[Int], Array.empty[Int], 1, 100, 200))
+ Array(1, 2, 3), Replicas.NONE, Replicas.NONE, 1, 100, 200))
fooPartitions.put(1, new PartitionRegistration(Array(4, 5, 6),
- Array(4, 5), Array.empty[Int], Array.empty[Int], 5, 300, 400))
+ Array(4, 5), Replicas.NONE, Replicas.NONE, 5, 300, 400))
val foo = new TopicImage("foo", FOO_UUID, fooPartitions)
val barPartitions = new util.HashMap[Integer, PartitionRegistration]()
barPartitions.put(0, new PartitionRegistration(Array(2, 3, 4),
- Array(2, 3, 4), Array.empty[Int], Array.empty[Int], 3, 100, 200))
+ Array(2, 3, 4), Replicas.NONE, Replicas.NONE, 3, 100, 200))
val bar = new TopicImage("bar", BAR_UUID, barPartitions)
topicsById.put(FOO_UUID, foo)
topicsByName.put("foo", foo)
@@ -2601,10 +2601,10 @@ class ReplicaManagerTest {
new TopicPartition("foo", 1) -> true),
Map(new TopicPartition("baz", 0) -> LocalLeaderInfo(BAZ_UUID,
new PartitionRegistration(Array(1, 2, 4), Array(1, 2, 4),
- Array.empty[Int], Array.empty[Int], 1, 123, 456))),
+ Replicas.NONE, Replicas.NONE, 1, 123, 456))),
Map(new TopicPartition("baz", 1) -> LocalLeaderInfo(BAZ_UUID,
new PartitionRegistration(Array(2, 4, 1), Array(2, 4, 1),
- Array.empty[Int], Array.empty[Int], 2, 123, 456)))),
+ Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
replicaManager.calculateDeltaChanges(TEST_DELTA))
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 79fc77a..db8d86a 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -378,7 +378,7 @@ public class ReplicationControlManager {
replicationFactor =
OptionalInt.of(assignment.brokerIds().size());
int[] replicas = Replicas.toArray(assignment.brokerIds());
newParts.put(assignment.partitionIndex(), new
PartitionRegistration(
- replicas, replicas, null, null, replicas[0], 0, 0));
+ replicas, replicas, Replicas.NONE, Replicas.NONE,
replicas[0], 0, 0));
}
} else if (topic.replicationFactor() < -1 || topic.replicationFactor()
== 0) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
@@ -401,7 +401,7 @@ public class ReplicationControlManager {
for (int partitionId = 0; partitionId < replicas.size();
partitionId++) {
int[] r = Replicas.toArray(replicas.get(partitionId));
newParts.put(partitionId,
- new PartitionRegistration(r, r, null, null, r[0], 0,
0));
+ new PartitionRegistration(r, r, Replicas.NONE,
Replicas.NONE, r[0], 0, 0));
}
} catch (InvalidReplicationFactorException e) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
@@ -928,8 +928,8 @@ public class ReplicationControlManager {
setTopicId(topicId).
setReplicas(placement).
setIsr(placement).
- setRemovingReplicas(null).
- setAddingReplicas(null).
+ setRemovingReplicas(Collections.emptyList()).
+ setAddingReplicas(Collections.emptyList()).
setLeader(placement.get(0)).
setLeaderEpoch(0).
setPartitionEpoch(0),
PARTITION_RECORD.highestSupportedVersion()));
diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json
b/metadata/src/main/resources/common/metadata/PartitionRecord.json
index fc5650d..233f852 100644
--- a/metadata/src/main/resources/common/metadata/PartitionRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json
@@ -28,9 +28,9 @@
"about": "The replicas of this partition, sorted by preferred order." },
{ "name": "Isr", "type": "[]int32", "versions": "0+",
"about": "The in-sync replicas of this partition" },
- { "name": "RemovingReplicas", "type": "[]int32", "versions": "0+",
"nullableVersions": "0+", "entityType": "brokerId",
+ { "name": "RemovingReplicas", "type": "[]int32", "versions": "0+",
"entityType": "brokerId",
"about": "The replicas that we are in the process of removing." },
- { "name": "AddingReplicas", "type": "[]int32", "versions": "0+",
"nullableVersions": "0+", "entityType": "brokerId",
+ { "name": "AddingReplicas", "type": "[]int32", "versions": "0+",
"entityType": "brokerId",
"about": "The replicas that we are in the process of adding." },
{ "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
"entityType": "brokerId",
"about": "The lead replica, or -1 if there is no leader." },
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 3d824c6..45c969d 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -427,13 +427,13 @@ public class QuorumControllerTest {
setName("foo").setTopicId(fooId), (short) 1),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).
- setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(null).
- setAddingReplicas(null).setLeader(0).setLeaderEpoch(0).
+ setIsr(Arrays.asList(0, 1,
2)).setRemovingReplicas(Collections.emptyList()).
+
setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 1),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(1).
setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).
- setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(null).
- setAddingReplicas(null).setLeader(1).setLeaderEpoch(0).
+ setIsr(Arrays.asList(1, 2,
0)).setRemovingReplicas(Collections.emptyList()).
+
setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 1),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 5b40f6d..6298c95 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -185,7 +185,7 @@ public class ReplicationControlManagerTest {
assertEquals(expectedResponse2, result2.response());
ctx.replay(result2.records());
assertEquals(new PartitionRegistration(new int[] {1, 2, 0},
- new int[] {1, 2, 0}, null, null, 1, 0, 0),
+ new int[] {1, 2, 0}, Replicas.NONE, Replicas.NONE, 1, 0, 0),
replicationControl.getPartition(
((TopicRecord) result2.records().get(0).message()).topicId(),
0));
ControllerResult<CreateTopicsResponseData> result3 =
@@ -200,7 +200,7 @@ public class ReplicationControlManagerTest {
Arrays.asList(new ApiMessageAndVersion(new PartitionRecord().
setPartitionId(0).setTopicId(fooId).
setReplicas(Arrays.asList(1, 2,
0)).setIsr(Arrays.asList(1, 2, 0)).
-
setRemovingReplicas(null).setAddingReplicas(null).setLeader(1).
+
setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).
setLeaderEpoch(0).setPartitionEpoch(0), (short) 1),
new ApiMessageAndVersion(new TopicRecord().
setTopicId(fooId).setName("foo"), (short) 1))),
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 7f0320f..7cd2279 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -82,9 +83,9 @@ public class TopicsImageTest {
List<TopicImage> topics1 = Arrays.asList(
newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
new PartitionRegistration(new int[] {2, 3, 4},
- new int[] {2, 3}, new int[0], new int[0], 2, 1, 345),
+ new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345),
new PartitionRegistration(new int[] {3, 4, 5},
- new int[] {3, 4, 5}, new int[0], new int[0], 3, 4, 684)),
+ new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684)),
newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4},
0, 1, 345)));
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index 35b3ffc..787f032 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -47,11 +47,11 @@ public class PartitionRegistrationTest {
@Test
public void testPartitionControlInfoMergeAndDiff() {
PartitionRegistration a = new PartitionRegistration(
- new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{}, new int[]{}, 1,
0, 0);
+ new int[]{1, 2, 3}, new int[]{1, 2}, Replicas.NONE, Replicas.NONE,
1, 0, 0);
PartitionRegistration b = new PartitionRegistration(
- new int[]{1, 2, 3}, new int[]{3}, new int[]{}, new int[]{}, 3, 1,
1);
+ new int[]{1, 2, 3}, new int[]{3}, Replicas.NONE, Replicas.NONE, 3,
1, 1);
PartitionRegistration c = new PartitionRegistration(
- new int[]{1, 2, 3}, new int[]{1}, new int[]{}, new int[]{}, 1, 0,
1);
+ new int[]{1, 2, 3}, new int[]{1}, Replicas.NONE, Replicas.NONE, 1,
0, 1);
assertEquals(b, a.merge(new PartitionChangeRecord().
setLeader(3).setIsr(Arrays.asList(3))));
assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1,
partitionEpoch: 0 -> 1",
@@ -63,7 +63,7 @@ public class PartitionRegistrationTest {
@Test
public void testRecordRoundTrip() {
PartitionRegistration registrationA = new PartitionRegistration(
- new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{1}, new int[]{}, 1,
0, 0);
+ new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{1}, Replicas.NONE,
1, 0, 0);
Uuid topicId = Uuid.fromString("OGdAI5nxT_m-ds3rJMqPLA");
int partitionId = 4;
ApiMessageAndVersion record = registrationA.toRecord(topicId,
partitionId);
@@ -75,9 +75,9 @@ public class PartitionRegistrationTest {
@Test
public void testToLeaderAndIsrPartitionState() {
PartitionRegistration a = new PartitionRegistration(
- new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{}, new int[]{}, 1,
123, 456);
+ new int[]{1, 2, 3}, new int[]{1, 2}, Replicas.NONE, Replicas.NONE,
1, 123, 456);
PartitionRegistration b = new PartitionRegistration(
- new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{},
2, 234, 567);
+ new int[]{2, 3, 4}, new int[]{2, 3, 4}, Replicas.NONE,
Replicas.NONE, 2, 234, 567);
assertEquals(new LeaderAndIsrPartitionState().
setTopicName("foo").
setPartitionIndex(1).