aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924676347
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
}
+ @ParameterizedTest
+ @ValueSource(strings = Array(
+ "org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
+ "org.apache.kafka.clients.consumer.RangeAssignor"))
+ def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+ // create 2 consumers
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"rebalance-and-rejoin-group")
+
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
assignmentStrategy)
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true")
+ val consumer1 = createConsumer()
+ val consumer2 = createConsumer()
+
+ // create a new topic, have 2 partitions
+ val topic = "topic1"
+ val producer = createProducer()
+ val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+ assertEquals(0, consumer1.assignment().size)
+ assertEquals(0, consumer2.assignment().size)
+
+ val lock = new ReentrantLock()
+ var generationId1 = -1
+ var memberId1 = ""
+ val customRebalanceListener = new ConsumerRebalanceListener {
+ override def onPartitionsRevoked(partitions:
util.Collection[TopicPartition]): Unit = {
+ }
+ override def onPartitionsAssigned(partitions:
util.Collection[TopicPartition]): Unit = {
+ if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+ fail(s"Time out while awaiting for lock.")
+ }
+ try {
+ generationId1 = consumer1.groupMetadata().generationId()
+ memberId1 = consumer1.groupMetadata().memberId()
+ } finally {
+ lock.unlock()
+ }
+ }
+ }
+ val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic),
Set.empty, customRebalanceListener)
+ consumerPoller1.start()
+ TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() ==
expectedAssignment,
+ s"Timed out while awaiting expected assignment change to
$expectedAssignment.")
+
+ // Since the consumer1 already completed the rebalance,
+ // the `onPartitionsAssigned` rebalance listener will be invoked to set
the generationId and memberId
+ var stableGeneration = -1
+ var stableMemberId1 = ""
+ if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+ fail(s"Time out while awaiting for lock.")
+ }
+ try {
+ stableGeneration = generationId1
+ stableMemberId1 = memberId1
+ } finally {
+ lock.unlock()
+ }
+
+ val consumerPoller2 = subscribeConsumerAndStartPolling(consumer2,
List(topic))
+ TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment().size ==
1,
+ s"Timed out while awaiting expected assignment change to 1.")
+ TestUtils.waitUntilTrue(() => consumerPoller2.consumerAssignment().size ==
1,
+ s"Timed out while awaiting expected assignment change to 1.")
+
+ if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+ fail(s"Time out while awaiting for lock.")
+ }
+ try {
+ if
(assignmentStrategy.equals(classOf[CooperativeStickyAssignor].getName)) {
+ // cooperative rebalance should rebalance twice before finally stable
+ assertEquals(stableGeneration + 2, generationId1)
+ } else {
+ // eager rebalance should rebalance once once before finally stable
Review Comment:
fixed, thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]