Repository: samza Updated Branches: refs/heads/master 357d6ca72 -> f8cce6e15
SAMZA-272, SAMZA-1440, SAMZA-1269: Fixed thread interrupt tests in TestExponentialSleepStrategy. Author: Prateek Maheshwari <pmahe...@linkedin.com> Reviewers: Jacob Maes <jma...@apache.org> Closes #318 from prateekm/ess-test-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f8cce6e1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f8cce6e1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f8cce6e1 Branch: refs/heads/master Commit: f8cce6e1571dd500f32b62fa510715d93e8986fb Parents: 357d6ca Author: Prateek Maheshwari <pmahe...@linkedin.com> Authored: Fri Oct 6 16:32:06 2017 -0700 Committer: Prateek Maheshwari <pmahe...@linkedin.com> Committed: Fri Oct 6 16:32:06 2017 -0700 ---------------------------------------------------------------------- .../samza/util/ExponentialSleepStrategy.scala | 3 +- .../util/TestExponentialSleepStrategy.scala | 46 +++++++++++++------- 2 files changed, 32 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f8cce6e1/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala index 4a04c13..da55371 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala @@ -73,7 +73,8 @@ class ExponentialSleepStrategy( * @param loopOperation The operation that should be attempted and may fail. * @param onException Handler function that determines what to do with an exception. * @return If loopOperation succeeded, an option containing the return value of - * the last invocation. If done was called in the exception hander, None. + * the last invocation. If done was called in the exception handler or the + * thread was interrupted, None. */ def run[A](loopOperation: RetryLoop => A, onException: (Exception, RetryLoop) => Unit): Option[A] = { val loop = startLoop http://git-wip-us.apache.org/repos/asf/samza/blob/f8cce6e1/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala index 546f41b..0514f8c 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala @@ -19,6 +19,8 @@ package org.apache.samza.util +import java.util.concurrent.{CountDownLatch, TimeUnit} + import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop import org.junit.Assert._ import org.junit.Test @@ -116,45 +118,57 @@ class TestExponentialSleepStrategy { assertEquals(0, loopObject.sleepCount) } - def interruptedThread(operation: => Unit) = { + def interruptedThread(operationStartLatch: CountDownLatch, operation: => Unit): Option[Throwable] = { var exception: Option[Throwable] = None val interruptee = new Thread(new Runnable { def run { try { operation } catch { case e: Exception => exception = Some(e) } } }) - interruptee.start - Thread.sleep(10) // give the thread a chance to make some progress before we interrupt it - interruptee.interrupt - interruptee.join + interruptee.start() + assertTrue("Operation start latch timed out.", operationStartLatch.await(1, TimeUnit.MINUTES)) + interruptee.interrupt() + interruptee.join() exception } - // TODO fix in SAMZA-1269 - // @Test - def testThreadInterruptInRetryLoop { + @Test def testThreadInterruptInRetryLoop { val strategy = new ExponentialSleepStrategy var iterations = 0 var loopObject: RetryLoop = null - val exception = interruptedThread { + val loopStartLatch = new CountDownLatch(1) // ensures that we've executed the operation at least once + val exception = interruptedThread( + loopStartLatch, strategy.run( - loop => { iterations += 1; loopObject = loop }, + loop => { loopObject = loop; loopStartLatch.countDown(); iterations += 1; }, (exception, loop) => throw exception ) - } - assertEquals(classOf[InterruptedException], exception.get.getClass) + ) + + // The interrupt can cause either, + // 1. the retry loop to exit with None result, no exception and isDone == false, or + // 2. the sleeping thread (during the back-off) to throw an InterruptedException. + assertTrue((!loopObject.isDone && exception.isEmpty) || + exception.get.getClass.equals(classOf[InterruptedException])) } @Test def testThreadInterruptInOperationSleep { val strategy = new ExponentialSleepStrategy var iterations = 0 var loopObject: RetryLoop = null - val exception = interruptedThread { + val loopStartLatch = new CountDownLatch(1) // ensures that we've executed the operation at least once + val exception = interruptedThread( + loopStartLatch, strategy.run( - loop => { iterations += 1; loopObject = loop; Thread.sleep(1000) }, + loop => { loopObject = loop; iterations += 1; loopStartLatch.countDown(); Thread.sleep(1000) }, (exception, loop) => throw exception ) - } - assertEquals(classOf[InterruptedException], exception.get.getClass) + ) + + // The interrupt can cause either, + // 1. the retry loop to exit with None result, no exception and isDone == false, or + // 2. the sleeping thread (in the operation or during the back-off) to throw an InterruptedException. + assertTrue((!loopObject.isDone && exception.isEmpty) || + exception.get.getClass.equals(classOf[InterruptedException])) } }