This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e94934b6b72 MINOR; DeleteTopics version tests (#12141)
e94934b6b72 is described below
commit e94934b6b7275c64cf4b9a63d8de88f326e591cd
Author: José Armando García Sancio <[email protected]>
AuthorDate: Thu May 12 13:04:48 2022 -0700
MINOR; DeleteTopics version tests (#12141)
Add a DeleteTopics test for all supported versions. Convert the
DeleteTopicsRequestTest to run against both ZK and KRaft mode.
Reviewers: Colin Patrick McCabe <[email protected]>, dengziming
<[email protected]>
---
.../common/message/LeaderAndIsrRequest.json | 2 +-
.../scala/kafka/controller/KafkaController.scala | 8 +--
core/src/main/scala/kafka/server/KafkaServer.scala | 4 +-
.../server/AbstractCreateTopicsRequestTest.scala | 12 +---
.../scala/unit/kafka/server/BaseRequestTest.scala | 14 ++++
.../kafka/server/DeleteTopicsRequestTest.scala | 83 +++++++++++++++++-----
.../kafka/server/common/MetadataVersion.java | 2 +-
7 files changed, 91 insertions(+), 34 deletions(-)
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index ce165c1616b..97881be27ff 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -28,7 +28,7 @@
//
// Version 5 adds Topic ID and Type to the TopicStates, as described in
KIP-516.
//
- // Version 6 adds ElectionState as described in KIP-704.
+ // Version 6 adds LeaderRecoveryState as described in KIP-704.
"validVersions": "0-6",
"flexibleVersions": "4+",
"fields": [
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index baac815b827..45c22435e68 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -87,7 +87,7 @@ class KafkaController(val config: KafkaConfig,
@volatile private var brokerInfo = initialBrokerInfo
@volatile private var _brokerEpoch = initialBrokerEpoch
- private val isAlterIsrEnabled =
config.interBrokerProtocolVersion.isAlterIsrSupported
+ private val isAlterPartitionEnabled =
config.interBrokerProtocolVersion.isAlterPartitionSupported
private val stateChangeLogger = new StateChangeLogger(config.brokerId,
inControllerContext = true, None)
val controllerContext = new ControllerContext
var controllerChannelManager = new
ControllerChannelManager(controllerContext, config, time, metrics,
@@ -802,7 +802,7 @@ class KafkaController(val config: KafkaConfig,
stopRemovedReplicasOfReassignedPartition(topicPartition,
unneededReplicas)
}
- if (!isAlterIsrEnabled) {
+ if (!isAlterPartitionEnabled) {
val reassignIsrChangeHandler = new
PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
}
@@ -1110,7 +1110,7 @@ class KafkaController(val config: KafkaConfig,
}
private def unregisterPartitionReassignmentIsrChangeHandlers(): Unit = {
- if (!isAlterIsrEnabled) {
+ if (!isAlterPartitionEnabled) {
controllerContext.partitionsBeingReassigned.foreach { tp =>
val path = TopicPartitionStateZNode.path(tp)
zkClient.unregisterZNodeChangeHandler(path)
@@ -1121,7 +1121,7 @@ class KafkaController(val config: KafkaConfig,
private def removePartitionFromReassigningPartitions(topicPartition:
TopicPartition,
assignment:
ReplicaAssignment): Unit = {
if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
- if (!isAlterIsrEnabled) {
+ if (!isAlterPartitionEnabled) {
val path = TopicPartitionStateZNode.path(topicPartition)
zkClient.unregisterZNodeChangeHandler(path)
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5cd16ce1660..c69f43c9eff 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -310,7 +310,7 @@ class KafkaServer(
socketServer.startup(startProcessingRequests = false)
// Start alter partition manager based on the IBP version
- alterIsrManager = if
(config.interBrokerProtocolVersion.isAlterIsrSupported) {
+ alterIsrManager = if
(config.interBrokerProtocolVersion.isAlterPartitionSupported) {
AlterPartitionManager(
config = config,
metadataCache = metadataCache,
@@ -325,7 +325,7 @@ class KafkaServer(
}
alterIsrManager.start()
- /* start replica manager */
+ // Start replica manager
_replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 91ff1d577da..ecee2cd19c4 100644
---
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -91,16 +91,8 @@ abstract class AbstractCreateTopicsRequestTest extends
BaseRequestTest {
topic
}
- def createTopicsSocketServer: SocketServer = {
- if (isKRaftTest()) {
- anySocketServer
- } else {
- controllerSocketServer
- }
- }
-
protected def validateValidCreateTopicsRequests(request:
CreateTopicsRequest): Unit = {
- val response = sendCreateTopicRequest(request, createTopicsSocketServer)
+ val response = sendCreateTopicRequest(request, adminSocketServer)
assertFalse(response.errorCounts().keySet().asScala.exists(_.code() > 0),
s"There should be no errors, found
${response.errorCounts().keySet().asScala.mkString(", ")},")
@@ -162,7 +154,7 @@ abstract class AbstractCreateTopicsRequestTest extends
BaseRequestTest {
protected def validateErrorCreateTopicsRequests(request: CreateTopicsRequest,
expectedResponse:
Map[String, ApiError],
checkErrorMessage: Boolean =
true): Unit = {
- val response = sendCreateTopicRequest(request, createTopicsSocketServer)
+ val response = sendCreateTopicRequest(request, adminSocketServer)
assertEquals(expectedResponse.size, response.data().topics().size, "The
response size should match")
expectedResponse.foreach { case (topicName, expectedError) =>
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index eee4608f74c..7d1f3eca185 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -83,6 +83,20 @@ abstract class BaseRequestTest extends
IntegrationTestHarness {
}.map(_.socketServer).getOrElse(throw new IllegalStateException(s"Could
not find broker with id $brokerId"))
}
+ /**
+ * Return the socket server where admin request to be sent.
+ *
+ * For KRaft clusters that is any broker as the broker will forward the
request to the active
+ * controller. For Legacy clusters that is the controller broker.
+ */
+ def adminSocketServer: SocketServer = {
+ if (isKRaftTest()) {
+ anySocketServer
+ } else {
+ controllerSocketServer
+ }
+ }
+
def connect(socketServer: SocketServer = anySocketServer,
listenerName: ListenerName = listenerName): Socket = {
new Socket("localhost", socketServer.boundPort(listenerName))
diff --git
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index a17612170d7..9137558437b 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -17,24 +17,28 @@
package kafka.server
-import java.util.{Arrays, Collections}
-
+import java.util.Arrays
+import java.util.Collections
import kafka.network.SocketServer
import kafka.utils._
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.DeleteTopicsRequestData
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{DeleteTopicsRequest,
DeleteTopicsResponse, MetadataRequest, MetadataResponse}
+import org.apache.kafka.common.requests.DeleteTopicsRequest
+import org.apache.kafka.common.requests.DeleteTopicsResponse
+import org.apache.kafka.common.requests.MetadataRequest
+import org.apache.kafka.common.requests.MetadataResponse
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
-class DeleteTopicsRequestTest extends BaseRequestTest {
+class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
- @Test
- def testValidDeleteTopicRequests(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testValidDeleteTopicRequests(quorum: String): Unit = {
val timeout = 10000
// Single topic
createTopic("topic-1", 1, 1)
@@ -66,6 +70,11 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
val response = sendDeleteTopicsRequest(request)
val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
assertTrue(error.isEmpty, s"There should be no errors, found
${response.data.responses.asScala}")
+
+ if (isKRaftTest()) {
+ TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
+ }
+
request.data.topicNames.forEach { topic =>
validateTopicIsDeleted(topic)
}
@@ -75,13 +84,22 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
val response = sendDeleteTopicsRequest(request)
val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
assertTrue(error.isEmpty, s"There should be no errors, found
${response.data.responses.asScala}")
+
+ if (isKRaftTest()) {
+ TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
+ }
+
response.data.responses.forEach { response =>
validateTopicIsDeleted(response.name())
}
}
- @Test
- def testErrorDeleteTopicRequests(): Unit = {
+ /*
+ * Only run this test against ZK cluster. The KRaft controller doesn't
perform operations that have timed out.
+ */
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk"))
+ def testErrorDeleteTopicRequests(quorum: String): Unit = {
val timeout = 30000
val timeoutTopic = "invalid-timeout"
@@ -103,14 +121,14 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
"partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION
)
)
-
+
// Topic IDs
createTopic("topic-id-1", 1, 1)
val validId = getTopicIds()("topic-id-1")
val invalidId = Uuid.randomUuid
validateErrorDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder(
new DeleteTopicsRequestData()
- .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(invalidId),
+ .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(invalidId),
new DeleteTopicState().setTopicId(validId)))
.setTimeoutMs(timeout)).build(),
Map(
@@ -128,7 +146,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
.setTimeoutMs(0)).build(),
Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT))
// The topic should still get deleted eventually
- TestUtils.waitUntilTrue(() =>
!servers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is
never deleted")
+ TestUtils.waitUntilTrue(() =>
!brokers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is
never deleted")
validateTopicIsDeleted(timeoutTopic)
}
@@ -166,8 +184,13 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
}
}
- @Test
- def testNotController(): Unit = {
+ /*
+ * Only run this test against ZK clusters. KRaft doesn't have this behavior
of returning NOT_CONTROLLER.
+ * Instead, the request is forwarded.
+ */
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk"))
+ def testNotController(quorum: String): Unit = {
val request = new DeleteTopicsRequest.Builder(
new DeleteTopicsRequestData()
.setTopicNames(Collections.singletonList("not-controller"))
@@ -185,8 +208,36 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
s"The topic $topic should not exist")
}
- private def sendDeleteTopicsRequest(request: DeleteTopicsRequest,
socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = {
+ private def sendDeleteTopicsRequest(
+ request: DeleteTopicsRequest,
+ socketServer: SocketServer = adminSocketServer
+ ): DeleteTopicsResponse = {
connectAndReceive[DeleteTopicsResponse](request, destination =
socketServer)
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk"))
+ def testDeleteTopicsVersions(quorum: String): Unit = {
+ // This test assumes that the current valid versions are 0-6 please adjust
the test if there are changes.
+ assertEquals(0, DeleteTopicsRequestData.LOWEST_SUPPORTED_VERSION)
+ assertEquals(6, DeleteTopicsRequestData.HIGHEST_SUPPORTED_VERSION)
+
+ val timeout = 10000
+ (0 until DeleteTopicsRequestData.SCHEMAS.size).foreach { version =>
+ info(s"Creating and deleting tests for version $version")
+
+ val topicName = s"topic-$version"
+
+ createTopic(topicName, 1, 1)
+ val data = new DeleteTopicsRequestData().setTimeoutMs(timeout)
+
+ if (version < 6) {
+ data.setTopicNames(Arrays.asList(topicName))
+ } else {
+ data.setTopics(Arrays.asList(new
DeleteTopicState().setName(topicName)))
+ }
+
+ validateValidDeleteTopicRequests(new
DeleteTopicsRequest.Builder(data).build(version.toShort))
+ }
+ }
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 0292dab1d28..e95b9248ec7 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -189,7 +189,7 @@ public enum MetadataVersion {
return this.isAtLeast(IBP_2_7_IV1);
}
- public boolean isAlterIsrSupported() {
+ public boolean isAlterPartitionSupported() {
return this.isAtLeast(IBP_2_7_IV2);
}