showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r917485378


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = 
Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, 
RangeAssignor.RANGE_ASSIGNOR_NAME))

Review Comment:
   It needs a full class name, to be passed into 
PARTITION_ASSIGNMENT_STRATEGY_CONFIG config, ex:
   `@ValueSource(strings = Array(classOf[CooperativeStickyAssignor].getName, 
classOf[RangeAssignor].getName))`
   



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = 
Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, 
RangeAssignor.RANGE_ASSIGNOR_NAME))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"rebalance-and-rejoin-group")
+    
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
+        if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")

Review Comment:
   Are you sure this is correct?
   ```
   Returns:
       true if the lock was free and was acquired by the current thread, or the 
lock was already held by the current thread; and false if the waiting time 
elapsed before the lock could be acquired
   ```
   
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantLock.html#tryLock-long-java.util.concurrent.TimeUnit-
   
   Maybe it is like this:
   ```
   if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
     fail(s"Time out while awaiting for lock.")
   }



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = 
Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, 
RangeAssignor.RANGE_ASSIGNOR_NAME))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"rebalance-and-rejoin-group")
+    
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
+        if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")
+          return

Review Comment:
   `return` is not necessary because `fail` will throw exception directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to