aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924676087
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition } @Override - protected boolean onJoinPrepare(int generation, String memberId) { + protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId); - boolean onJoinPrepareAsyncCommitCompleted = false; + if (joinPrepareTimer == null) { + joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs); + } else { + joinPrepareTimer.update(); + } + // async commit offsets prior to rebalance if auto-commit enabled - RequestFuture<Void> future = maybeAutoCommitOffsetsAsync(); - // return true when - // 1. future is null, which means no commit request sent, so it is still considered completed - // 2. offset commit completed - // 3. offset commit failed with non-retriable exception - if (future == null) - onJoinPrepareAsyncCommitCompleted = true; - else if (future.succeeded()) - onJoinPrepareAsyncCommitCompleted = true; - else if (future.failed() && !future.isRetriable()) { - log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage()); - onJoinPrepareAsyncCommitCompleted = true; + // and there is no in-flight offset commit request + if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) { + autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync(); } + // wait for commit offset response before timer expired. + if (autoCommitOffsetRequestFuture != null) { + Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ? + timer : joinPrepareTimer; + client.poll(autoCommitOffsetRequestFuture, pollTimer); + timer.update(); + joinPrepareTimer.update(); + } + + // keep retrying the offset commit when: + // 1. offset commit haven't done (and joinPrepareTime not expired) + // 2. failed with retryable exception (and joinPrepareTime not expired) + // Otherwise, continue to revoke partitions, ex: + // 1. if joinPrepareTime has expired + // 2. if offset commit failed with no-retryable exception + // 3. if offset commit success + boolean onJoinPrepareAsyncCommitCompleted = true; + if (autoCommitOffsetRequestFuture != null) { + if (joinPrepareTimer.isExpired()) { + log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout. Will continue to join group"); + } else if (!autoCommitOffsetRequestFuture.isDone()) { + onJoinPrepareAsyncCommitCompleted = false; + } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) { + log.debug("Asynchronous auto-commit of offsets failed with retryable error: {}. Will retry it.", + autoCommitOffsetRequestFuture.exception().getMessage()); + onJoinPrepareAsyncCommitCompleted = false; + } else if (autoCommitOffsetRequestFuture.failed() && !autoCommitOffsetRequestFuture.isRetriable()) { + log.error("Asynchronous auto-commit of offsets failed: {}. Will continue to join group.", + autoCommitOffsetRequestFuture.exception().getMessage()); + } + if (autoCommitOffsetRequestFuture.isDone()) { + autoCommitOffsetRequestFuture = null; + } + } + if (!onJoinPrepareAsyncCommitCompleted) { + timer.sleep(Math.min(timer.remainingMs(), rebalanceConfig.retryBackoffMs)); Review Comment: you are right, i just try to update every timer in my last commit. :-) fixed, thanks ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @ParameterizedTest + @ValueSource(strings = Array( + "org.apache.kafka.clients.consumer.CooperativeStickyAssignor", + "org.apache.kafka.clients.consumer.RangeAssignor")) + def testRebalanceAndRejoin(assignmentStrategy: String): Unit = { + // create 2 consumers + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy) + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + val consumer1 = createConsumer() + val consumer2 = createConsumer() + + // create a new topic, have 2 partitions + val topic = "topic1" + val producer = createProducer() + val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100) + + assertEquals(0, consumer1.assignment().size) + assertEquals(0, consumer2.assignment().size) + + val lock = new ReentrantLock() + var generationId1 = -1 + var memberId1 = "" + val customRebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { + } + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { + if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) { + fail(s"Time out while awaiting for lock.") + } + try { + generationId1 = consumer1.groupMetadata().generationId() + memberId1 = consumer1.groupMetadata().memberId() + } finally { + lock.unlock() + } + } + } + val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic), Set.empty, customRebalanceListener) + consumerPoller1.start() + TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == expectedAssignment, + s"Timed out while awaiting expected assignment change to $expectedAssignment.") + + // Since the consumer1 already completed the rebalance, + // the `onPartitionsAssigned` rebalance listener will be invoked to set the generationId and memberId + var stableGeneration = -1 + var stableMemberId1 = "" + if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) { + fail(s"Time out while awaiting for lock.") + } + try { + stableGeneration = generationId1 + stableMemberId1 = memberId1 + } finally { + lock.unlock() + } + + val consumerPoller2 = subscribeConsumerAndStartPolling(consumer2, List(topic)) + TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment().size == 1, + s"Timed out while awaiting expected assignment change to 1.") Review Comment: fixed, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org