Repository: kafka Updated Branches: refs/heads/trunk cb78ba129 -> 3de683993
KAFKA-4948; Wait for offset commit in test to fix transient failure `DescribeConsumerGroupTest#testDescribeExistingGroupWithNoMembersWithNewConsumer` shuts down the consumer executor thread and then checks that the assignments returned by `describeGroup` contain the consume group with no members. But if the executor thread is shut down before any offsets are committed, the assignments returned by `describeGroup` doesn't contain the group at all. This PR waits for an offset commit by waiting for the group to appear in `describeGroup` assignments prior to shutting down the executor. Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3246 from rajinisivaram/KAFKA-4948 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3de68399 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3de68399 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3de68399 Branch: refs/heads/trunk Commit: 3de683993dcc1f363cc2e1b177181036b96c2cee Parents: cb78ba1 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Tue Jun 6 17:07:23 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Tue Jun 6 17:07:23 2017 +0100 ---------------------------------------------------------------------- .../scala/unit/kafka/admin/DescribeConsumerGroupTest.scala | 7 +++++++ 1 file changed, 7 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3de68399/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index d3f9573..2c09cc4 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -186,6 +186,13 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { state == Some("Stable") }, "Expected the group to initially become stable.") + // Group assignments in describeGroup rely on finding committed consumer offsets. + // Wait for an offset commit before shutting down the group executor. + TestUtils.waitUntilTrue(() => { + val (_, assignments) = consumerGroupService.describeGroup() + assignments.exists(_.exists(_.group == group)) + }, "Expected to find group in assignments after initial offset commit") + // stop the consumer so the group has no active member anymore consumerGroupExecutor.shutdown()