showuon commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r913457476
########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @Test + def testCoopearativeAssigorAndRejoin(): Unit = { + // 2 consumer using cooperative-sticky assignment + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName) + val consumer1 = createConsumer() + val consumer2 = createConsumer() + + // create a new topic, have 2 partitions + val topic1 = "topic1" + val producer = createProducer() + val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100) + + assertEquals(0, consumer1.assignment().size) + assertEquals(0, consumer2.assignment().size) + + val lock1 = new ReentrantLock() + var generationId1 = 0 Review Comment: I think the initial number should set a invalid number, ex: -1, to avoid the end generationId happen to be `0`, so that we don't catch any error in this test. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @Test + def testCoopearativeAssigorAndRejoin(): Unit = { + // 2 consumer using cooperative-sticky assignment + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName) + val consumer1 = createConsumer() + val consumer2 = createConsumer() + + // create a new topic, have 2 partitions + val topic1 = "topic1" + val producer = createProducer() + val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100) + + assertEquals(0, consumer1.assignment().size) + assertEquals(0, consumer2.assignment().size) + + val lock1 = new ReentrantLock() + var generationId1 = 0 + var memberId1 = "" + val customRebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { + } + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { + lock1.lock() Review Comment: I know in this simple test, there should not be deadlock happen, but I still think we should avoid any possibility here. Could we use `tryLock` instead of `lock`, to avoid to block the test forever? ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @Test + def testCoopearativeAssigorAndRejoin(): Unit = { + // 2 consumer using cooperative-sticky assignment + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName) + val consumer1 = createConsumer() + val consumer2 = createConsumer() + + // create a new topic, have 2 partitions + val topic1 = "topic1" + val producer = createProducer() + val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100) + + assertEquals(0, consumer1.assignment().size) + assertEquals(0, consumer2.assignment().size) + + val lock1 = new ReentrantLock() Review Comment: since we don't have `lock2`, could we name it as `lock` here? ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @Test + def testCoopearativeAssigorAndRejoin(): Unit = { + // 2 consumer using cooperative-sticky assignment + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName) + val consumer1 = createConsumer() + val consumer2 = createConsumer() + + // create a new topic, have 2 partitions + val topic1 = "topic1" Review Comment: Since we only need 1 topic in this test, we can use global `topic` variable here. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @Test + def testCoopearativeAssigorAndRejoin(): Unit = { + // 2 consumer using cooperative-sticky assignment + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName) + val consumer1 = createConsumer() + val consumer2 = createConsumer() + + // create a new topic, have 2 partitions + val topic1 = "topic1" + val producer = createProducer() + val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100) + + assertEquals(0, consumer1.assignment().size) + assertEquals(0, consumer2.assignment().size) + + val lock1 = new ReentrantLock() + var generationId1 = 0 + var memberId1 = "" + val customRebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { + } + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { + lock1.lock() + try { + generationId1 = consumer1.groupMetadata().generationId() + memberId1 = consumer1.groupMetadata().memberId() + } finally { + lock1.unlock() + } + } + } + val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic1), Set.empty, customRebalanceListener) + consumerPoller1.start() + TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == expectedAssignment, + s"Timed out while awaiting expected assignment change to 2.") Review Comment: I don't think you're only waiting for the assignment size to 2 here, you also wait for the content of the assignment is expected. Maybe you can change the error message to: ``` s"Timed out while awaiting expected assignment change to $expectedAssignment." ``` ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @Test + def testCoopearativeAssigorAndRejoin(): Unit = { + // 2 consumer using cooperative-sticky assignment + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName) + val consumer1 = createConsumer() + val consumer2 = createConsumer() + + // create a new topic, have 2 partitions + val topic1 = "topic1" + val producer = createProducer() + val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100) + + assertEquals(0, consumer1.assignment().size) + assertEquals(0, consumer2.assignment().size) + + val lock1 = new ReentrantLock() + var generationId1 = 0 + var memberId1 = "" + val customRebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { + } + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { + lock1.lock() Review Comment: Same comments to below lock ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @Test + def testCoopearativeAssigorAndRejoin(): Unit = { + // 2 consumer using cooperative-sticky assignment + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName) + val consumer1 = createConsumer() + val consumer2 = createConsumer() + + // create a new topic, have 2 partitions + val topic1 = "topic1" + val producer = createProducer() + val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100) + + assertEquals(0, consumer1.assignment().size) + assertEquals(0, consumer2.assignment().size) + + val lock1 = new ReentrantLock() + var generationId1 = 0 + var memberId1 = "" + val customRebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { + } + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { + lock1.lock() + try { + generationId1 = consumer1.groupMetadata().generationId() + memberId1 = consumer1.groupMetadata().memberId() + } finally { + lock1.unlock() + } + } + } + val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic1), Set.empty, customRebalanceListener) + consumerPoller1.start() + TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == expectedAssignment, + s"Timed out while awaiting expected assignment change to 2.") + + var stableGeneration = 0 Review Comment: 1. Same as above, change the initial number to -1 2. maybe we need to add a comment above the line to explain why we can make sure we already got the `generationId1` and `memberId1`. Ex: ``` // Since the consumer1 already completed the rebalance, the `onPartitionsAssigned` rebalance listener will be invoked to set the generationId and memberId ``` ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @Test + def testCoopearativeAssigorAndRejoin(): Unit = { Review Comment: 1. Maybe rename to `testRebalanceAndRejoin` 2. Could we also test eager assignor? Ex: ``` @ParameterizedTest @ValueSource(strings = Array(classOf[CooperativeStickyAssignor].getName, classOf[RangeAssignor].getName)) def testRebalanceAndRejoin(partitionAssignmentStrategy: String): Unit = { ``` I think the only place we need to change is the last generationId should be +1 or +2, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org