This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 701f924352d KAFKA-15140: Use TestUtils methods and add logs for
assertion failure at TopicCommandIntegrationTest (#13950)
701f924352d is described below
commit 701f924352da1225a881f0f78f19ddf51485030a
Author: DL1231 <[email protected]>
AuthorDate: Tue Jul 4 22:02:39 2023 +0800
KAFKA-15140: Use TestUtils methods and add logs for assertion failure at
TopicCommandIntegrationTest (#13950)
This commit utilizes TestUtils methods to create a topic and adds logs when
assertions fail.
Reviewers: Divij Vaidya <[email protected]>
---------
Co-authored-by: d00791190 <[email protected]>
---
.../kafka/admin/TopicCommandIntegrationTest.scala | 169 ++++++++-------------
1 file changed, 65 insertions(+), 104 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
index ea4c748da92..9e1beaf6adf 100644
---
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
@@ -54,7 +54,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
* `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
*/
override def generateConfigs: Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(
- numConfigs = 6,
+ numConfigs = numBrokers,
zkConnect = zkConnectOrNull,
rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4
-> "rack3", 5 -> "rack3"),
numPartitions = numPartitions,
@@ -66,6 +66,8 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
private val numPartitions = 1
private val defaultReplicationFactor = 1.toShort
+ private val numBrokers = 6
+ private val lineSeparator = System.lineSeparator()
private var topicService: TopicService = _
private var adminClient: Admin = _
@@ -253,7 +255,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val output = TestUtils.grabConsoleOutput(
topicService.listTopics(new TopicCommandOptions(Array())))
- assertTrue(output.contains(testTopicName))
+ assertTrue(output.contains(testTopicName), s"Unexpected output: $output")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -262,46 +264,36 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val topic1 = "kafka.testTopic1"
val topic2 = "kafka.testTopic2"
val topic3 = "oooof.testTopic1"
- adminClient.createTopics(
- List(new NewTopic(topic1, 2, 2.toShort),
- new NewTopic(topic2, 2, 2.toShort),
- new NewTopic(topic3, 2, 2.toShort)).asJavaCollection)
- .all().get()
- waitForTopicCreated(topic1)
- waitForTopicCreated(topic2)
- waitForTopicCreated(topic3)
+ TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, topic2, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, topic3, brokers, 2, 2)
val output = TestUtils.grabConsoleOutput(
topicService.listTopics(new TopicCommandOptions(Array("--topic",
"kafka.*"))))
- assertTrue(output.contains(topic1))
- assertTrue(output.contains(topic2))
- assertFalse(output.contains(topic3))
+ assertTrue(output.contains(topic1), s"Unexpected output: $output")
+ assertTrue(output.contains(topic2), s"Unexpected output: $output")
+ assertFalse(output.contains(topic3), s"Unexpected output: $output")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testListTopicsWithExcludeInternal(quorum: String): Unit = {
val topic1 = "kafka.testTopic1"
- adminClient.createTopics(
- List(new NewTopic(topic1, 2, 2.toShort),
- new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 2,
2.toShort)).asJavaCollection)
- .all().get()
- waitForTopicCreated(topic1)
+ TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient,
Topic.GROUP_METADATA_TOPIC_NAME, brokers, 2, 2)
val output = TestUtils.grabConsoleOutput(
topicService.listTopics(new
TopicCommandOptions(Array("--exclude-internal"))))
- assertTrue(output.contains(topic1))
- assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
+ assertTrue(output.contains(topic1), s"Unexpected output: $output")
+ assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), s"Unexpected
output: $output")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterPartitionCount(quorum: String): Unit = {
- adminClient.createTopics(
- List(new NewTopic(testTopicName, 2,
2.toShort)).asJavaCollection).all().get()
- waitForTopicCreated(testTopicName)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "3")))
@@ -316,9 +308,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignment(quorum: String): Unit = {
- adminClient.createTopics(
- Collections.singletonList(new NewTopic(testTopicName, 2,
2.toShort))).all().get()
- waitForTopicCreated(testTopicName)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2",
"--partitions", "3")))
@@ -334,9 +324,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignmentWithMoreAssignmentThanPartitions(quorum: String):
Unit = {
- adminClient.createTopics(
- List(new NewTopic(testTopicName, 2,
2.toShort)).asJavaCollection).all().get()
- waitForTopicCreated(testTopicName)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
assertThrows(classOf[ExecutionException],
() => topicService.alterTopic(new TopicCommandOptions(
@@ -346,9 +334,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignmentWithMorePartitionsThanAssignment(quorum: String):
Unit = {
- adminClient.createTopics(
- List(new NewTopic(testTopicName, 2,
2.toShort)).asJavaCollection).all().get()
- waitForTopicCreated(testTopicName)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
assertThrows(classOf[ExecutionException],
() => topicService.alterTopic(new TopicCommandOptions(
@@ -534,15 +520,13 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribe(quorum: String): Unit = {
- adminClient.createTopics(
- Collections.singletonList(new NewTopic(testTopicName, 2,
2.toShort))).all().get()
- waitForTopicCreated(testTopicName)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--topic",
testTopicName))))
- val rows = output.split("\n")
- assertEquals(3, rows.size)
- assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))
+ val rows = output.split(lineSeparator)
+ assertEquals(3, rows.size, s"Unexpected output: $output")
+ assertTrue(rows(0).startsWith(s"Topic: $testTopicName"), s"Unexpected
output: ${rows(0)}")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -561,9 +545,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnavailablePartitions(quorum: String): Unit = {
- adminClient.createTopics(
- Collections.singletonList(new NewTopic(testTopicName, 6,
1.toShort))).all().get()
- waitForTopicCreated(testTopicName)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
numBrokers, 1)
try {
// check which partition is on broker 0 which we'll kill
@@ -594,9 +576,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--unavailable-partitions"))))
- val rows = output.split("\n")
- assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
- assertTrue(rows(0).contains("Leader: none\tReplicas: 0\tIsr:"))
+ val rows = output.split(lineSeparator)
+ assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected
output: ${rows(0)}")
+ assertTrue(rows(0).contains("Leader: none\tReplicas: 0\tIsr:"),
s"Unexpected output: ${rows(0)}")
} finally {
restartDeadBrokers()
}
@@ -605,9 +587,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnderReplicatedPartitions(quorum: String): Unit = {
- adminClient.createTopics(
- Collections.singletonList(new NewTopic(testTopicName, 1,
6.toShort))).all().get()
- waitForTopicCreated(testTopicName)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1,
numBrokers)
try {
killBroker(0)
@@ -618,7 +598,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
}
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new
TopicCommandOptions(Array("--under-replicated-partitions"))))
- val rows = output.split("\n")
+ val rows = output.split(lineSeparator)
assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected
output: ${rows(0)}")
} finally {
restartDeadBrokers()
@@ -628,12 +608,11 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnderMinIsrPartitions(quorum: String): Unit = {
- val configMap = new java.util.HashMap[String, String]()
- configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6")
+ val topicProps = new Properties()
+ topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
numBrokers.toString)
- adminClient.createTopics(
- Collections.singletonList(new NewTopic(testTopicName, 1,
6.toShort).configs(configMap))).all().get()
- waitForTopicCreated(testTopicName)
+ // create topic
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1,
numBrokers, topicConfig = topicProps)
try {
killBroker(0)
@@ -647,8 +626,8 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
}
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new
TopicCommandOptions(Array("--under-min-isr-partitions"))))
- val rows = output.split("\n")
- assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
+ val rows = output.split(lineSeparator)
+ assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected
output: ${rows(0)}")
} finally {
restartDeadBrokers()
}
@@ -657,14 +636,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(quorum:
String): Unit = {
- val configMap = new java.util.HashMap[String, String]()
- val replicationFactor: Short = 1
- val partitions = 1
val tp = new TopicPartition(testTopicName, 0)
- adminClient.createTopics(
- Collections.singletonList(new NewTopic(testTopicName, partitions,
replicationFactor).configs(configMap))).all().get()
- waitForTopicCreated(testTopicName)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1)
// Produce multiple batches.
TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages =
10, acks = -1)
@@ -693,9 +667,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
// describe the topic and test if it's under-replicated
val simpleDescribeOutput = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--topic",
testTopicName))))
- val simpleDescribeOutputRows = simpleDescribeOutput.split("\n")
- assertTrue(simpleDescribeOutputRows(0).startsWith(s"Topic:
$testTopicName"))
- assertEquals(2, simpleDescribeOutputRows.size)
+ val simpleDescribeOutputRows = simpleDescribeOutput.split(lineSeparator)
+ assertTrue(simpleDescribeOutputRows(0).startsWith(s"Topic:
$testTopicName"), s"Unexpected output: ${simpleDescribeOutputRows(0)}")
+ assertEquals(2, simpleDescribeOutputRows.size, s"Unexpected output:
$simpleDescribeOutput")
val underReplicatedOutput = TestUtils.grabConsoleOutput(
topicService.describeTopic(new
TopicCommandOptions(Array("--under-replicated-partitions"))))
@@ -712,12 +686,11 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeAtMinIsrPartitions(quorum: String): Unit = {
- val configMap = new java.util.HashMap[String, String]()
- configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
+ val topicProps = new Properties()
+ topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
- adminClient.createTopics(
- Collections.singletonList(new NewTopic(testTopicName, 1,
6.toShort).configs(configMap))).all().get()
- waitForTopicCreated(testTopicName)
+ // create topic
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1,
numBrokers, topicConfig = topicProps)
try {
killBroker(0)
@@ -734,9 +707,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new
TopicCommandOptions(Array("--at-min-isr-partitions"))))
- val rows = output.split("\n")
- assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
- assertEquals(1, rows.length)
+ val rows = output.split(lineSeparator)
+ assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected
output: ${rows(0)}")
+ assertEquals(1, rows.length, s"Unexpected output: $output")
} finally {
restartDeadBrokers()
}
@@ -758,21 +731,14 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val notUnderMinIsrTopic = "not-under-min-isr-topic"
val offlineTopic = "offline-topic"
val fullyReplicatedTopic = "fully-replicated-topic"
+ val topicProps = new Properties()
+ topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
numBrokers.toString)
- val configMap = new java.util.HashMap[String, String]()
- configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6")
-
- adminClient.createTopics(
- java.util.Arrays.asList(
- new NewTopic(underMinIsrTopic, 1, 6.toShort).configs(configMap),
- new NewTopic(notUnderMinIsrTopic, 1, 6.toShort),
- new NewTopic(offlineTopic, Collections.singletonMap(0,
Collections.singletonList(0))),
- new NewTopic(fullyReplicatedTopic, Collections.singletonMap(0,
java.util.Arrays.asList(1, 2, 3))))).all().get()
-
- waitForTopicCreated(underMinIsrTopic)
- waitForTopicCreated(notUnderMinIsrTopic)
- waitForTopicCreated(offlineTopic)
- waitForTopicCreated(fullyReplicatedTopic)
+ // create topic
+ TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, brokers, 1,
numBrokers, topicConfig = topicProps)
+ TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, brokers,
1, numBrokers)
+ TestUtils.createTopicWithAdmin(adminClient, offlineTopic, brokers, 1,
replicaAssignment = Map(0 -> Seq(0)))
+ TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, brokers,
1, replicaAssignment = Map(0 -> Seq(1, 2, 3)))
try {
killBroker(0)
@@ -782,17 +748,17 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
TestUtils.waitUntilTrue(
() => aliveBrokers.forall(
broker =>
- broker.metadataCache.getPartitionInfo(underMinIsrTopic,
0).get.isr().size() < 6 &&
+ broker.metadataCache.getPartitionInfo(underMinIsrTopic,
0).get.isr().size() < numBrokers &&
broker.metadataCache.getPartitionInfo(offlineTopic,
0).get.leader() == MetadataResponse.NO_LEADER_ID),
"Timeout waiting for partition metadata propagating to brokers for
underMinIsrTopic topic"
)
}
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new
TopicCommandOptions(Array("--under-min-isr-partitions"))))
- val rows = output.split("\n")
- assertTrue(rows(0).startsWith(s"\tTopic: $underMinIsrTopic"))
- assertTrue(rows(1).startsWith(s"\tTopic: $offlineTopic"))
- assertEquals(2, rows.length)
+ val rows = output.split(lineSeparator)
+ assertTrue(rows(0).startsWith(s"\tTopic: $underMinIsrTopic"),
s"Unexpected output: ${rows(0)}")
+ assertTrue(rows(1).startsWith(s"\tTopic: $offlineTopic"), s"Unexpected
output: ${rows(1)}")
+ assertEquals(2, rows.length, s"Unexpected output: $output")
} finally {
restartDeadBrokers()
}
@@ -822,12 +788,12 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
var output = TestUtils.grabConsoleOutput(topicService.describeTopic(new
TopicCommandOptions(
Array("--describe", "--exclude-internal"))))
assertTrue(output.contains(testTopicName), s"Output should have contained
$testTopicName")
- assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
+ assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), s"Unexpected
output: $output")
// test list
output = TestUtils.grabConsoleOutput(topicService.listTopics(new
TopicCommandOptions(Array("--list", "--exclude-internal"))))
- assertTrue(output.contains(testTopicName))
- assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
+ assertTrue(output.contains(testTopicName), s"Unexpected output: $output")
+ assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), s"Unexpected
output: $output")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -845,24 +811,19 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
Set(new TopicPartition(testTopicName, 0)).asJava
)
- adminClient.createTopics(
- Collections.singletonList(new NewTopic(testTopicName, 1, 1.toShort))
- ).all().get()
- waitForTopicCreated(testTopicName)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--topic",
testTopicName))))
- val rows = output.split("\n")
- assertEquals(2, rows.size)
- assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))
+ val rows = output.split(lineSeparator)
+ assertEquals(2, rows.size, s"Unexpected output: $output")
+ assertTrue(rows(0).startsWith(s"Topic: $testTopicName"), s"Unexpected
output: ${rows(0)}")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithTopicNameCollision(quorum: String): Unit = {
- adminClient.createTopics(
- Collections.singletonList(new NewTopic("foo_bar", 1,
6.toShort))).all().get()
- waitForTopicCreated("foo_bar")
+ TestUtils.createTopicWithAdmin(adminClient, "foo_bar", brokers, 1,
numBrokers)
assertThrows(classOf[InvalidTopicException],
() => topicService.createTopic(new TopicCommandOptions(Array("--topic",
"foo.bar"))))