Repository: kafka
Updated Branches:
  refs/heads/trunk cb78ba129 -> 3de683993


KAFKA-4948; Wait for offset commit in test to fix transient failure

`DescribeConsumerGroupTest#testDescribeExistingGroupWithNoMembersWithNewConsumer`
 shuts down the consumer executor thread and then checks that the assignments 
returned by `describeGroup` contain the consume group with no members. But if 
the executor thread is shut down before any offsets are committed, the 
assignments returned by `describeGroup` doesn't contain the group at all. This 
PR waits for an offset commit by waiting for the group to appear in 
`describeGroup` assignments prior to shutting down the executor.

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3246 from rajinisivaram/KAFKA-4948


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3de68399
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3de68399
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3de68399

Branch: refs/heads/trunk
Commit: 3de683993dcc1f363cc2e1b177181036b96c2cee
Parents: cb78ba1
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Tue Jun 6 17:07:23 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Tue Jun 6 17:07:23 2017 +0100

----------------------------------------------------------------------
 .../scala/unit/kafka/admin/DescribeConsumerGroupTest.scala    | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3de68399/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala 
b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index d3f9573..2c09cc4 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -186,6 +186,13 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
       state == Some("Stable")
     }, "Expected the group to initially become stable.")
 
+    // Group assignments in describeGroup rely on finding committed consumer 
offsets.
+    // Wait for an offset commit before shutting down the group executor.
+    TestUtils.waitUntilTrue(() => {
+      val (_, assignments) = consumerGroupService.describeGroup()
+      assignments.exists(_.exists(_.group == group))
+    }, "Expected to find group in assignments after initial offset commit")
+
     // stop the consumer so the group has no active member anymore
     consumerGroupExecutor.shutdown()
 

Reply via email to