This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e94934b6b72 MINOR; DeleteTopics version tests (#12141)
e94934b6b72 is described below

commit e94934b6b7275c64cf4b9a63d8de88f326e591cd
Author: José Armando García Sancio <[email protected]>
AuthorDate: Thu May 12 13:04:48 2022 -0700

    MINOR; DeleteTopics version tests (#12141)
    
    Add a DeleteTopics test for all supported versions. Convert the
    DeleteTopicsRequestTest to run against both ZK and KRaft mode.
    
    Reviewers: Colin Patrick McCabe <[email protected]>, dengziming 
<[email protected]>
---
 .../common/message/LeaderAndIsrRequest.json        |  2 +-
 .../scala/kafka/controller/KafkaController.scala   |  8 +--
 core/src/main/scala/kafka/server/KafkaServer.scala |  4 +-
 .../server/AbstractCreateTopicsRequestTest.scala   | 12 +---
 .../scala/unit/kafka/server/BaseRequestTest.scala  | 14 ++++
 .../kafka/server/DeleteTopicsRequestTest.scala     | 83 +++++++++++++++++-----
 .../kafka/server/common/MetadataVersion.java       |  2 +-
 7 files changed, 91 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json 
b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index ce165c1616b..97881be27ff 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -28,7 +28,7 @@
   //
   // Version 5 adds Topic ID and Type to the TopicStates, as described in 
KIP-516.
   //
-  // Version 6 adds ElectionState as described in KIP-704.
+  // Version 6 adds LeaderRecoveryState as described in KIP-704.
   "validVersions": "0-6",
   "flexibleVersions": "4+",
   "fields": [
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index baac815b827..45c22435e68 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -87,7 +87,7 @@ class KafkaController(val config: KafkaConfig,
   @volatile private var brokerInfo = initialBrokerInfo
   @volatile private var _brokerEpoch = initialBrokerEpoch
 
-  private val isAlterIsrEnabled = 
config.interBrokerProtocolVersion.isAlterIsrSupported
+  private val isAlterPartitionEnabled = 
config.interBrokerProtocolVersion.isAlterPartitionSupported
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, 
inControllerContext = true, None)
   val controllerContext = new ControllerContext
   var controllerChannelManager = new 
ControllerChannelManager(controllerContext, config, time, metrics,
@@ -802,7 +802,7 @@ class KafkaController(val config: KafkaConfig,
         stopRemovedReplicasOfReassignedPartition(topicPartition, 
unneededReplicas)
     }
 
-    if (!isAlterIsrEnabled) {
+    if (!isAlterPartitionEnabled) {
       val reassignIsrChangeHandler = new 
PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
       zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
     }
@@ -1110,7 +1110,7 @@ class KafkaController(val config: KafkaConfig,
   }
 
   private def unregisterPartitionReassignmentIsrChangeHandlers(): Unit = {
-    if (!isAlterIsrEnabled) {
+    if (!isAlterPartitionEnabled) {
       controllerContext.partitionsBeingReassigned.foreach { tp =>
         val path = TopicPartitionStateZNode.path(tp)
         zkClient.unregisterZNodeChangeHandler(path)
@@ -1121,7 +1121,7 @@ class KafkaController(val config: KafkaConfig,
   private def removePartitionFromReassigningPartitions(topicPartition: 
TopicPartition,
                                                        assignment: 
ReplicaAssignment): Unit = {
     if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
-      if (!isAlterIsrEnabled) {
+      if (!isAlterPartitionEnabled) {
         val path = TopicPartitionStateZNode.path(topicPartition)
         zkClient.unregisterZNodeChangeHandler(path)
       }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5cd16ce1660..c69f43c9eff 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -310,7 +310,7 @@ class KafkaServer(
         socketServer.startup(startProcessingRequests = false)
 
         // Start alter partition manager based on the IBP version
-        alterIsrManager = if 
(config.interBrokerProtocolVersion.isAlterIsrSupported) {
+        alterIsrManager = if 
(config.interBrokerProtocolVersion.isAlterPartitionSupported) {
           AlterPartitionManager(
             config = config,
             metadataCache = metadataCache,
@@ -325,7 +325,7 @@ class KafkaServer(
         }
         alterIsrManager.start()
 
-        /* start replica manager */
+        // Start replica manager
         _replicaManager = createReplicaManager(isShuttingDown)
         replicaManager.startup()
 
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 91ff1d577da..ecee2cd19c4 100644
--- 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -91,16 +91,8 @@ abstract class AbstractCreateTopicsRequestTest extends 
BaseRequestTest {
     topic
   }
 
-  def createTopicsSocketServer: SocketServer = {
-    if (isKRaftTest()) {
-      anySocketServer
-    } else {
-      controllerSocketServer
-    }
-  }
-
   protected def validateValidCreateTopicsRequests(request: 
CreateTopicsRequest): Unit = {
-    val response = sendCreateTopicRequest(request, createTopicsSocketServer)
+    val response = sendCreateTopicRequest(request, adminSocketServer)
 
     assertFalse(response.errorCounts().keySet().asScala.exists(_.code() > 0),
       s"There should be no errors, found 
${response.errorCounts().keySet().asScala.mkString(", ")},")
@@ -162,7 +154,7 @@ abstract class AbstractCreateTopicsRequestTest extends 
BaseRequestTest {
   protected def validateErrorCreateTopicsRequests(request: CreateTopicsRequest,
                                                   expectedResponse: 
Map[String, ApiError],
                                                   checkErrorMessage: Boolean = 
true): Unit = {
-    val response = sendCreateTopicRequest(request, createTopicsSocketServer)
+    val response = sendCreateTopicRequest(request, adminSocketServer)
     assertEquals(expectedResponse.size, response.data().topics().size, "The 
response size should match")
 
     expectedResponse.foreach { case (topicName, expectedError) =>
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index eee4608f74c..7d1f3eca185 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -83,6 +83,20 @@ abstract class BaseRequestTest extends 
IntegrationTestHarness {
     }.map(_.socketServer).getOrElse(throw new IllegalStateException(s"Could 
not find broker with id $brokerId"))
   }
 
+  /**
+   * Return the socket server where admin request to be sent.
+   *
+   * For KRaft clusters that is any broker as the broker will forward the 
request to the active
+   * controller. For Legacy clusters that is the controller broker.
+   */
+  def adminSocketServer: SocketServer = {
+    if (isKRaftTest()) {
+      anySocketServer
+    } else {
+      controllerSocketServer
+    }
+  }
+
   def connect(socketServer: SocketServer = anySocketServer,
               listenerName: ListenerName = listenerName): Socket = {
     new Socket("localhost", socketServer.boundPort(listenerName))
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index a17612170d7..9137558437b 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -17,24 +17,28 @@
 
 package kafka.server
 
-import java.util.{Arrays, Collections}
-
+import java.util.Arrays
+import java.util.Collections
 import kafka.network.SocketServer
 import kafka.utils._
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.message.DeleteTopicsRequestData
 import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{DeleteTopicsRequest, 
DeleteTopicsResponse, MetadataRequest, MetadataResponse}
+import org.apache.kafka.common.requests.DeleteTopicsRequest
+import org.apache.kafka.common.requests.DeleteTopicsResponse
+import org.apache.kafka.common.requests.MetadataRequest
+import org.apache.kafka.common.requests.MetadataResponse
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import scala.jdk.CollectionConverters._
 
-class DeleteTopicsRequestTest extends BaseRequestTest {
+class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
 
-  @Test
-  def testValidDeleteTopicRequests(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testValidDeleteTopicRequests(quorum: String): Unit = {
     val timeout = 10000
     // Single topic
     createTopic("topic-1", 1, 1)
@@ -66,6 +70,11 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
     val response = sendDeleteTopicsRequest(request)
     val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
     assertTrue(error.isEmpty, s"There should be no errors, found 
${response.data.responses.asScala}")
+
+    if (isKRaftTest()) {
+      TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
+    }
+
     request.data.topicNames.forEach { topic =>
       validateTopicIsDeleted(topic)
     }
@@ -75,13 +84,22 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
     val response = sendDeleteTopicsRequest(request)
     val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
     assertTrue(error.isEmpty, s"There should be no errors, found 
${response.data.responses.asScala}")
+
+    if (isKRaftTest()) {
+      TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
+    }
+
     response.data.responses.forEach { response =>
       validateTopicIsDeleted(response.name())
     }
   }
 
-  @Test
-  def testErrorDeleteTopicRequests(): Unit = {
+  /*
+   * Only run this test against ZK cluster. The KRaft controller doesn't 
perform operations that have timed out.
+   */
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testErrorDeleteTopicRequests(quorum: String): Unit = {
     val timeout = 30000
     val timeoutTopic = "invalid-timeout"
 
@@ -103,14 +121,14 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
         "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION
       )
     )
-    
+
     // Topic IDs
     createTopic("topic-id-1", 1, 1)
     val validId = getTopicIds()("topic-id-1")
     val invalidId = Uuid.randomUuid
     validateErrorDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder(
       new DeleteTopicsRequestData()
-        .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(invalidId), 
+        .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(invalidId),
             new DeleteTopicState().setTopicId(validId)))
         .setTimeoutMs(timeout)).build(),
       Map(
@@ -128,7 +146,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
           .setTimeoutMs(0)).build(),
       Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT))
     // The topic should still get deleted eventually
-    TestUtils.waitUntilTrue(() => 
!servers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is 
never deleted")
+    TestUtils.waitUntilTrue(() => 
!brokers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is 
never deleted")
     validateTopicIsDeleted(timeoutTopic)
   }
 
@@ -166,8 +184,13 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testNotController(): Unit = {
+  /*
+   * Only run this test against ZK clusters. KRaft doesn't have this behavior 
of returning NOT_CONTROLLER.
+   * Instead, the request is forwarded.
+   */
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testNotController(quorum: String): Unit = {
     val request = new DeleteTopicsRequest.Builder(
         new DeleteTopicsRequestData()
           .setTopicNames(Collections.singletonList("not-controller"))
@@ -185,8 +208,36 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
       s"The topic $topic should not exist")
   }
 
-  private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, 
socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = {
+  private def sendDeleteTopicsRequest(
+    request: DeleteTopicsRequest,
+    socketServer: SocketServer = adminSocketServer
+  ): DeleteTopicsResponse = {
     connectAndReceive[DeleteTopicsResponse](request, destination = 
socketServer)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testDeleteTopicsVersions(quorum: String): Unit = {
+    // This test assumes that the current valid versions are 0-6 please adjust 
the test if there are changes.
+    assertEquals(0, DeleteTopicsRequestData.LOWEST_SUPPORTED_VERSION)
+    assertEquals(6, DeleteTopicsRequestData.HIGHEST_SUPPORTED_VERSION)
+
+    val timeout = 10000
+    (0 until DeleteTopicsRequestData.SCHEMAS.size).foreach { version =>
+      info(s"Creating and deleting tests for version $version")
+
+      val topicName = s"topic-$version"
+
+      createTopic(topicName, 1, 1)
+      val data = new DeleteTopicsRequestData().setTimeoutMs(timeout)
+
+      if (version < 6) {
+        data.setTopicNames(Arrays.asList(topicName))
+      } else {
+        data.setTopics(Arrays.asList(new 
DeleteTopicState().setName(topicName)))
+      }
+
+      validateValidDeleteTopicRequests(new 
DeleteTopicsRequest.Builder(data).build(version.toShort))
+    }
+  }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 0292dab1d28..e95b9248ec7 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -189,7 +189,7 @@ public enum MetadataVersion {
         return this.isAtLeast(IBP_2_7_IV1);
     }
 
-    public boolean isAlterIsrSupported() {
+    public boolean isAlterPartitionSupported() {
         return this.isAtLeast(IBP_2_7_IV2);
     }
 

Reply via email to