showuon commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r917485378
########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @ParameterizedTest + @ValueSource(strings = Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, RangeAssignor.RANGE_ASSIGNOR_NAME)) Review Comment: It needs a full class name, to be passed into PARTITION_ASSIGNMENT_STRATEGY_CONFIG config, ex: `@ValueSource(strings = Array(classOf[CooperativeStickyAssignor].getName, classOf[RangeAssignor].getName))` ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @ParameterizedTest + @ValueSource(strings = Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, RangeAssignor.RANGE_ASSIGNOR_NAME)) + def testRebalanceAndRejoin(assignmentStrategy: String): Unit = { + // create 2 consumers + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy) + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + val consumer1 = createConsumer() + val consumer2 = createConsumer() + + // create a new topic, have 2 partitions + val topic = "topic1" + val producer = createProducer() + val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100) + + assertEquals(0, consumer1.assignment().size) + assertEquals(0, consumer2.assignment().size) + + val lock = new ReentrantLock() + var generationId1 = -1 + var memberId1 = "" + val customRebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { + } + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { + if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) { + fail(s"Time out while awaiting for lock.") Review Comment: Are you sure this is correct? ``` Returns: true if the lock was free and was acquired by the current thread, or the lock was already held by the current thread; and false if the waiting time elapsed before the lock could be acquired ``` https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantLock.html#tryLock-long-java.util.concurrent.TimeUnit- Maybe it is like this: ``` if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) { fail(s"Time out while awaiting for lock.") } ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @ParameterizedTest + @ValueSource(strings = Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, RangeAssignor.RANGE_ASSIGNOR_NAME)) + def testRebalanceAndRejoin(assignmentStrategy: String): Unit = { + // create 2 consumers + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy) + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + val consumer1 = createConsumer() + val consumer2 = createConsumer() + + // create a new topic, have 2 partitions + val topic = "topic1" + val producer = createProducer() + val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100) + + assertEquals(0, consumer1.assignment().size) + assertEquals(0, consumer2.assignment().size) + + val lock = new ReentrantLock() + var generationId1 = -1 + var memberId1 = "" + val customRebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { + } + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { + if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) { + fail(s"Time out while awaiting for lock.") + return Review Comment: `return` is not necessary because `fail` will throw exception directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org