mumrah commented on code in PR #12456:
URL: https://github.com/apache/kafka/pull/12456#discussion_r932766733
##########
core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala:
##########
@@ -345,8 +353,15 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
* Producer will attempt to send messages to the partition specified in
each record, and should
* succeed as long as the partition is included in the metadata.
*/
- @Test
- def testSendBeforeAndAfterPartitionExpansion(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendBeforeAndAfterPartitionExpansion(quorum: String): Unit = {
+ resource(createAdminClient(brokers, listenerName)) { admin =>
Review Comment:
Interesting, never knew about `TestUtils.resource`.
##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -918,6 +918,76 @@ object TestUtils extends Logging {
}
}
+ /**
+ * If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the
leader of a partition is elected.
+ * If oldLeaderOpt is defined, it waits until the new leader is different
from the old leader.
+ * If newLeaderOpt is defined, it waits until the new leader becomes the
expected new leader.
+ *
+ * @return The new leader (note that negative values are used to indicate
conditions like NoLeader and
+ * LeaderDuringDelete).
+ * @throws AssertionError if the expected condition is not true within the
timeout.
+ */
+ def waitUntilLeaderIsElectedOrChangedWithAdmin(
+ admin: Admin,
+ topic: String,
+ partition: Int,
+ timeoutMs: Long = 30000L,
+ oldLeaderOpt: Option[Int] = None,
+ newLeaderOpt: Option[Int] = None
+ ): Int = {
+ require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define
both the old and the new leader")
+ val startTime = System.currentTimeMillis()
+ val topicPartition = new TopicPartition(topic, partition)
+
+ trace(s"Waiting for leader to be elected or changed for partition
$topicPartition, old leader is $oldLeaderOpt, " +
+ s"new leader is $newLeaderOpt")
+
+ var leader: Option[Int] = None
+ var electedOrChangedLeader: Option[Int] = None
+ while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() <
startTime + timeoutMs) {
Review Comment:
Would it simplify any of this to use `computeUntilTrue` or one of the other
helper "until" methods in TestUtils?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]