This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit bbd659325a60c8923afaa0bf099ad0536e297914
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Tue Aug 2 15:39:47 2022 -0700

    KAFKA-14129: KRaft must check manual assignments for createTopics are 
contiguous (#12467)
    
    KRaft should validate that manual assignments given to createTopics are 
contiguous. In other words,
    they must start with partition 0, and progress through 1, 2, 3, etc. ZK 
mode does this, but KRaft
    mode previously did not. Also fix a null pointer exception when the 
placement for partition 0
    was not specified.
    
    Convert over AddPartitionsTest to use KRaft. This PR converts all of the 
test except for some of
    the placement logic tests, which will need to be redone for KRaft mode in a 
future change.
    
    Fix null pointer exception in KRaftMetadataCache#getPartitionInfo.  
Specifically, we should not
    assume that the partition will be found in the hash map. This is another 
case where we had
    "Some(x)" but it should be "Option(x)."
    
    Fix a potential null pointer exception in BrokerServer#state.
    
    Reviewers: dengziming <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   5 +-
 .../kafka/server/metadata/KRaftMetadataCache.scala |   2 +-
 .../scala/unit/kafka/admin/AddPartitionsTest.scala | 164 ++++++++++++++-------
 .../controller/ReplicationControlManager.java      |   8 +-
 4 files changed, 123 insertions(+), 56 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index eb21c1ed25e..0bdd6734975 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -81,7 +81,8 @@ class BrokerServer(
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]]
 ) extends KafkaBroker {
 
-  override def brokerState: BrokerState = lifecycleManager.state
+  override def brokerState: BrokerState = Option(lifecycleManager).
+    flatMap(m => Some(m.state)).getOrElse(BrokerState.NOT_RUNNING)
 
   import kafka.server.Server._
 
@@ -89,7 +90,7 @@ class BrokerServer(
 
   this.logIdent = logContext.logPrefix
 
-  @volatile private var lifecycleManager: BrokerLifecycleManager = null
+  @volatile var lifecycleManager: BrokerLifecycleManager = null
 
   private val isShuttingDown = new AtomicBoolean(false)
 
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index ae2e6523573..52577211503 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -229,7 +229,7 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 
   override def getPartitionInfo(topicName: String, partitionId: Int): 
Option[UpdateMetadataPartitionState] = {
     Option(_currentImage.topics().getTopic(topicName)).
-      flatMap(topic => Some(topic.partitions().get(partitionId))).
+      flatMap(topic => Option(topic.partitions().get(partitionId))).
       flatMap(partition => Some(new UpdateMetadataPartitionState().
         setTopicName(topicName).
         setPartitionIndex(partitionId).
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index ea4215d9c39..4e2bfee60ee 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,18 +17,24 @@
 
 package kafka.admin
 
-import java.util.Optional
+import java.util.{Collections, Optional}
 import kafka.controller.ReplicaAssignment
-import kafka.server.BaseRequestTest
-import kafka.utils.TestUtils
+import kafka.server.{BaseRequestTest, BrokerServer}
+import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.utils.TestUtils._
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
 import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
-
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections.singletonList
+import java.util.concurrent.ExecutionException
 import scala.jdk.CollectionConverters._
 
 class AddPartitionsTest extends BaseRequestTest {
@@ -47,44 +53,97 @@ class AddPartitionsTest extends BaseRequestTest {
   val topic4Assignment = Map(0 -> ReplicaAssignment(Seq(0,3), List(), List()))
   val topic5 = "new-topic5"
   val topic5Assignment = Map(1 -> ReplicaAssignment(Seq(0,1), List(), List()))
+  var admin: Admin = null
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
 
+    if (isKRaftTest()) {
+      brokers.foreach(broker => 
broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
+    }
     createTopicWithAssignment(topic1, partitionReplicaAssignment = 
topic1Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic2, partitionReplicaAssignment = 
topic2Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic3, partitionReplicaAssignment = 
topic3Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic4, partitionReplicaAssignment = 
topic4Assignment.map { case (k, v) => k -> v.replicas })
+    admin = createAdminClient()
   }
 
-  @Test
-  def testWrongReplicaCount(): Unit = {
-    assertThrows(classOf[InvalidReplicaAssignmentException], () => 
adminZkClient.addPartitions(topic1, topic1Assignment, 
adminZkClient.getBrokerMetadatas(), 2,
-      Some(Map(0 -> Seq(0, 1), 1 -> Seq(0, 1, 2)))))
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWrongReplicaCount(quorum: String): Unit = {
+    assertEquals(classOf[InvalidReplicaAssignmentException], 
assertThrows(classOf[ExecutionException], () => {
+        admin.createPartitions(Collections.singletonMap(topic1,
+          NewPartitions.increaseTo(2, singletonList(asList(0, 1, 
2))))).all().get()
+      }).getCause.getClass)
   }
 
-  @Test
-  def testMissingPartition0(): Unit = {
-    val e = assertThrows(classOf[AdminOperationException], () => 
adminZkClient.addPartitions(topic5, topic5Assignment, 
adminZkClient.getBrokerMetadatas(), 2,
-      Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
-    assertTrue(e.getMessage.contains("Unexpected existing replica assignment 
for topic 'new-topic5', partition id 0 is missing"))
+  /**
+   * Test that when we supply a manual partition assignment to createTopics, 
it must be 0-based
+   * and consecutive.
+   */
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMissingPartitionsInCreateTopics(quorum: String): Unit = {
+    val topic6Placements = new util.HashMap[Integer, util.List[Integer]]
+    topic6Placements.put(1, asList(0, 1))
+    topic6Placements.put(2, asList(1, 0))
+    val topic7Placements = new util.HashMap[Integer, util.List[Integer]]
+    topic7Placements.put(2, asList(0, 1))
+    topic7Placements.put(3, asList(1, 0))
+    val futures = admin.createTopics(asList(
+      new NewTopic("new-topic6", topic6Placements),
+      new NewTopic("new-topic7", topic7Placements))).values()
+    val topic6Cause = assertThrows(classOf[ExecutionException], () => 
futures.get("new-topic6").get()).getCause
+    assertEquals(classOf[InvalidReplicaAssignmentException], 
topic6Cause.getClass)
+    assertTrue(topic6Cause.getMessage.contains("partitions should be a 
consecutive 0-based integer sequence"),
+      "Unexpected error message: " + topic6Cause.getMessage)
+    val topic7Cause = assertThrows(classOf[ExecutionException], () => 
futures.get("new-topic7").get()).getCause
+    assertEquals(classOf[InvalidReplicaAssignmentException], 
topic7Cause.getClass)
+    assertTrue(topic7Cause.getMessage.contains("partitions should be a 
consecutive 0-based integer sequence"),
+      "Unexpected error message: " + topic7Cause.getMessage)
   }
 
-  @Test
-  def testIncrementPartitions(): Unit = {
-    adminZkClient.addPartitions(topic1, topic1Assignment, 
adminZkClient.getBrokerMetadatas(), 3)
+  /**
+   * Test that when we supply a manual partition assignment to 
createPartitions, it must contain
+   * enough partitions.
+   */
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMissingPartitionsInCreatePartitions(quorum: String): Unit = {
+    val cause = assertThrows(classOf[ExecutionException], () =>
+      admin.createPartitions(Collections.singletonMap(topic1,
+        NewPartitions.increaseTo(3, singletonList(asList(0, 1, 
2))))).all().get()).getCause
+    assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass)
+    if (isKRaftTest()) {
+      assertTrue(cause.getMessage.contains("Attempted to add 2 additional 
partition(s), but only 1 assignment(s) " +
+        "were specified."), "Unexpected error message: " + cause.getMessage)
+    } else {
+      assertTrue(cause.getMessage.contains("Increasing the number of 
partitions by 2 but 1 assignments provided."),
+        "Unexpected error message: " + cause.getMessage)
+    }
+    if (!isKRaftTest()) {
+      // In ZK mode, test the raw AdminZkClient method as well.
+      val e = assertThrows(classOf[AdminOperationException], () => 
adminZkClient.addPartitions(
+        topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
+        Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
+      assertTrue(e.getMessage.contains("Unexpected existing replica assignment 
for topic 'new-topic5', partition " +
+        "id 0 is missing"))
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncrementPartitions(quorum: String): Unit = {
+    admin.createPartitions(Collections.singletonMap(topic1, 
NewPartitions.increaseTo(3))).all().get()
+
     // wait until leader is elected
-    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1)
-    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2)
-    val leader1FromZk = zkClient.getLeaderForPartition(new 
TopicPartition(topic1, 1)).get
-    val leader2FromZk = zkClient.getLeaderForPartition(new 
TopicPartition(topic1, 2)).get
-    assertEquals(leader1, leader1FromZk)
-    assertEquals(leader2, leader2FromZk)
+    waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 1)
+    waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 2)
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic1, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic1, 2)
+    TestUtils.waitForPartitionMetadata(brokers, topic1, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic1, 2)
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic1).asJava, false).build)
     assertEquals(1, response.topicMetadata.size)
@@ -102,22 +161,21 @@ class AddPartitionsTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testManualAssignmentOfReplicas(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testManualAssignmentOfReplicas(quorum: String): Unit = {
     // Add 2 partitions
-    adminZkClient.addPartitions(topic2, topic2Assignment, 
adminZkClient.getBrokerMetadatas(), 3,
-      Some(Map(0 -> Seq(1, 2), 1 -> Seq(0, 1), 2 -> Seq(2, 3))))
+    admin.createPartitions(Collections.singletonMap(topic2, 
NewPartitions.increaseTo(3,
+      asList(asList(0, 1), asList(2, 3))))).all().get()
     // wait until leader is elected
-    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1)
-    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2)
-    val leader1FromZk = zkClient.getLeaderForPartition(new 
TopicPartition(topic2, 1)).get
-    val leader2FromZk = zkClient.getLeaderForPartition(new 
TopicPartition(topic2, 2)).get
-    assertEquals(leader1, leader1FromZk)
-    assertEquals(leader2, leader2FromZk)
+    val leader1 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 1)
+    val leader2 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 2)
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic2, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic2, 2)
+    val partition1Metadata = TestUtils.waitForPartitionMetadata(brokers, 
topic2, 1)
+    assertEquals(leader1, partition1Metadata.leader())
+    val partition2Metadata = TestUtils.waitForPartitionMetadata(brokers, 
topic2, 2)
+    assertEquals(leader2, partition2Metadata.leader())
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
     assertEquals(1, response.topicMetadata.size)
@@ -132,17 +190,18 @@ class AddPartitionsTest extends BaseRequestTest {
     assertEquals(Set(0, 1), replicas.asScala.toSet)
   }
 
-  @Test
-  def testReplicaPlacementAllServers(): Unit = {
-    adminZkClient.addPartitions(topic3, topic3Assignment, 
adminZkClient.getBrokerMetadatas(), 7)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // TODO: add kraft support
+  def testReplicaPlacementAllServers(quorum: String): Unit = {
+    admin.createPartitions(Collections.singletonMap(topic3, 
NewPartitions.increaseTo(7))).all().get()
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic3, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 2)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 3)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 4)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 5)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 6)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 2)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 3)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 4)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 5)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 6)
 
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic3).asJava, false).build)
@@ -157,13 +216,14 @@ class AddPartitionsTest extends BaseRequestTest {
     validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3))
   }
 
-  @Test
-  def testReplicaPlacementPartialServers(): Unit = {
-    adminZkClient.addPartitions(topic2, topic2Assignment, 
adminZkClient.getBrokerMetadatas(), 3)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // TODO: add kraft support
+  def testReplicaPlacementPartialServers(quorum: String): Unit = {
+    admin.createPartitions(Collections.singletonMap(topic2, 
NewPartitions.increaseTo(3))).all().get()
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic2, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic2, 2)
+    TestUtils.waitForPartitionMetadata(brokers, topic2, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
 
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 3a3788c41a5..bf3a679d2ce 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -666,6 +666,12 @@ public class ReplicationControlManager {
                     Replicas.toArray(assignment.brokerIds()), 
Replicas.toArray(isr),
                     Replicas.NONE, Replicas.NONE, isr.get(0), 
LeaderRecoveryState.RECOVERED, 0, 0));
             }
+            for (int i = 0; i < newParts.size(); i++) {
+                if (!newParts.containsKey(i)) {
+                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+                            "partitions should be a consecutive 0-based 
integer sequence");
+                }
+            }
             ApiError error = maybeCheckCreateTopicPolicy(() -> {
                 Map<Integer, List<Integer>> assignments = new HashMap<>();
                 newParts.entrySet().forEach(e -> assignments.put(e.getKey(),
@@ -744,7 +750,7 @@ public class ReplicationControlManager {
                     setIsSensitive(entry.isSensitive()));
             }
             result.setNumPartitions(newParts.size());
-            result.setReplicationFactor((short) 
newParts.get(0).replicas.length);
+            result.setReplicationFactor((short) 
newParts.values().iterator().next().replicas.length);
             result.setTopicConfigErrorCode(NONE.code());
         } else {
             result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code());

Reply via email to