Repository: kafka Updated Branches: refs/heads/trunk 89c67727c -> 03817d5a2
KAFKA-3529: Fix transient failure in testCommitAsync Author: Jason Gustafson <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1234 from hachikuji/KAFKA-3529 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/03817d5a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/03817d5a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/03817d5a Branch: refs/heads/trunk Commit: 03817d5a26722c6e95647f6219abf2802b187c8d Parents: 89c6772 Author: Jason Gustafson <[email protected]> Authored: Mon Apr 18 12:49:36 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Apr 18 12:49:36 2016 -0700 ---------------------------------------------------------------------- .../test/scala/integration/kafka/api/BaseConsumerTest.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/03817d5a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 916a0ab..56dae76 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -77,10 +77,6 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { // check async commit callbacks val commitCallback = new CountConsumerCommitCallback() this.consumers(0).commitAsync(commitCallback) - - // shouldn't make progress until poll is invoked - Thread.sleep(10) - assertEquals(0, commitCallback.successCount) awaitCommitCallback(this.consumers(0), commitCallback) } @@ -331,11 +327,10 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], commitCallback: CountConsumerCommitCallback, count: Int = 1): Unit = { - val startCount = commitCallback.successCount val started = System.currentTimeMillis() - while (commitCallback.successCount < startCount + count && System.currentTimeMillis() - started < 10000) + while (commitCallback.successCount < count && System.currentTimeMillis() - started < 10000) consumer.poll(50) - assertEquals(startCount + count, commitCallback.successCount) + assertEquals(count, commitCallback.successCount) } protected class CountConsumerCommitCallback extends OffsetCommitCallback {
