Repository: samza
Updated Branches:
  refs/heads/master 312c1b17b -> 27c9e4c2e


http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java 
b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
new file mode 100644
index 0000000..2659050
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import static junit.framework.Assert.*;
+
+import org.junit.Before;
+import org.mockito.Mockito;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class TestThrottlingExecutor {
+  private static final Runnable NO_OP = new Runnable() {
+    @Override
+    public void run() {
+      // Do nothing.
+    }
+  };
+
+  private HighResolutionClock clock;
+  private ThrottlingExecutor executor;
+
+  @Before
+  public void setUp() {
+    clock = Mockito.mock(HighResolutionClock.class);
+    executor = new ThrottlingExecutor(clock);
+  }
+
+  @Test
+  public void testInitialState() {
+    ThrottlingExecutor throttler = new ThrottlingExecutor();
+    assertEquals(0, throttler.getPendingNanos());
+    assertEquals(1.0, throttler.getWorkFactor());
+  }
+
+  @Test
+  public void testSetWorkRate() {
+    executor.setWorkFactor(1.0);
+    assertEquals(1.0, executor.getWorkFactor());
+
+    executor.setWorkFactor(0.5);
+    assertEquals(0.5, executor.getWorkFactor());
+
+    executor.setWorkFactor(ThrottlingExecutor.MIN_WORK_FACTOR);
+    assertEquals(ThrottlingExecutor.MIN_WORK_FACTOR, executor.getWorkFactor());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testLessThan0PercentWorkRate() {
+    new ThrottlingExecutor().setWorkFactor(-0.1);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGreaterThan100PercentWorkRate() {
+    new ThrottlingExecutor().setWorkFactor(1.1);
+  }
+
+  @Test
+  public void test100PercentWorkRate() throws InterruptedException {
+    setWorkTime(TimeUnit.MILLISECONDS.toNanos(5));
+
+    executor.execute(NO_OP);
+
+    assertEquals(0L, executor.getPendingNanos());
+
+    // At 100% work rate sleep should not be called
+    Mockito.verify(clock, Mockito.never()).sleep(Mockito.anyLong());
+  }
+
+  @Test
+  public void test50PercentWorkRate() throws InterruptedException {
+    executor.setWorkFactor(0.5);
+
+    final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
+    setWorkTime(workTimeNanos);
+    // Sleep time is same as work time at 50% work rate
+    setExpectedAndActualSleepTime(workTimeNanos, workTimeNanos);
+    executor.execute(NO_OP);
+
+    verifySleepTime(workTimeNanos);
+    assertEquals(0L, executor.getPendingNanos());
+  }
+
+  @Test
+  public void testMinWorkRate() throws InterruptedException {
+    final double workFactor = ThrottlingExecutor.MIN_WORK_FACTOR;
+    executor.setWorkFactor(workFactor);
+
+    // The math to work out how much to multiply work time to get expected 
delay time
+    double workToDelayFactor = (1.0 - workFactor) / workFactor;
+
+    final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
+    final long delayTimeNanos = (long) (workToDelayFactor * workTimeNanos);
+
+    setWorkTime(workTimeNanos);
+    setExpectedAndActualSleepTime(delayTimeNanos, delayTimeNanos);
+
+    executor.execute(NO_OP);
+
+    verifySleepTime(delayTimeNanos);
+    assertEquals(0, executor.getPendingNanos());
+  }
+
+  @Test
+  public void testSleepOvershoot() throws InterruptedException {
+    executor.setWorkFactor(0.5);
+
+    final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
+    final long expectedDelayNanos = workTimeNanos;
+    final long actualDelayTimeNanos = TimeUnit.MILLISECONDS.toNanos(6);
+
+    setWorkTime(workTimeNanos);
+    setExpectedAndActualSleepTime(expectedDelayNanos, actualDelayTimeNanos);
+
+    executor.execute(NO_OP);
+
+    verifySleepTime(expectedDelayNanos);
+    assertEquals(expectedDelayNanos - actualDelayTimeNanos, 
executor.getPendingNanos());
+  }
+
+  @Test
+  public void testSleepUndershoot() throws InterruptedException {
+    executor.setWorkFactor(0.5);
+
+    final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
+    final long expectedDelayNanos = workTimeNanos;
+    final long actualDelayNanos = TimeUnit.MILLISECONDS.toNanos(4);
+
+    setWorkTime(workTimeNanos);
+    setExpectedAndActualSleepTime(expectedDelayNanos, actualDelayNanos);
+
+    executor.execute(NO_OP);
+
+    verifySleepTime(expectedDelayNanos);
+    assertEquals(expectedDelayNanos - actualDelayNanos, 
executor.getPendingNanos());
+  }
+
+  @Test
+  public void testApplyPendingSleepNanos() throws InterruptedException {
+    // This verifies that the executor tries to re-apply pending sleep time on 
the next execution.
+    executor.setWorkFactor(0.5);
+
+    final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
+    final long actualDelayNanos1 = TimeUnit.MILLISECONDS.toNanos(4);
+    final long actualDelayNanos2 = TimeUnit.MILLISECONDS.toNanos(6);
+
+    // First execution
+    setWorkTime(workTimeNanos);
+    setExpectedAndActualSleepTime(workTimeNanos, actualDelayNanos1);
+
+    executor.execute(NO_OP);
+
+    verifySleepTime(workTimeNanos);
+    assertEquals(workTimeNanos - actualDelayNanos1, 
executor.getPendingNanos());
+
+
+    // Second execution
+    setWorkTime(workTimeNanos);
+    setExpectedAndActualSleepTime(workTimeNanos, actualDelayNanos2);
+
+    executor.execute(NO_OP);
+
+    verifySleepTime(workTimeNanos);
+    assertEquals(0L, executor.getPendingNanos());
+  }
+
+  @Test
+  public void testDecreaseWorkFactor() {
+    executor.setWorkFactor(0.5);
+    executor.setPendingNanos(5000);
+
+    executor.setWorkFactor(0.3);
+    assertEquals(5000, executor.getPendingNanos());
+  }
+
+  @Test
+  public void testOverflowOfSleepNanos() throws InterruptedException {
+    executor.setWorkFactor(0.5);
+    executor.setPendingNanos(Long.MAX_VALUE);
+    assertEquals(Long.MAX_VALUE, executor.getPendingNanos());
+
+    // At a 50% work factor we'd expect work and sleep to match. As they 
don't, the function will
+    // try to increment the pending sleep nanos, which could (but should not) 
result in overflow.
+    setWorkTime(5000);
+
+    executor.execute(NO_OP);
+
+    // Expect sleep nanos to be clamped to the maximum long value
+    verifySleepTime(Long.MAX_VALUE);
+  }
+
+  @Test
+  public void testNegativePendingNanos() throws InterruptedException {
+    executor.setWorkFactor(0.5);
+    executor.setPendingNanos(-1000);
+    assertEquals(-1000, executor.getPendingNanos());
+
+    // Note: we do not expect the delay time to be used because work time + 
pending delay is
+    // negative.
+    setWorkTime(500);
+
+    executor.execute(NO_OP);
+
+    // Sleep should not be called with negative pending nanos
+    Mockito.verify(clock, Mockito.never()).sleep(Mockito.anyLong());
+    assertEquals(-1000 + 500, executor.getPendingNanos());
+  }
+
+  @Test
+  public void testNegativePendingNanosGoesPositive() throws 
InterruptedException {
+    executor.setWorkFactor(0.5);
+    long startPendingNanos = -1000;
+    executor.setPendingNanos(startPendingNanos);
+    assertEquals(-1000, executor.getPendingNanos());
+
+    setWorkTime(1250);
+
+    executor.execute(NO_OP);
+
+    verifySleepTime(1250 + startPendingNanos);
+    assertEquals(0, executor.getPendingNanos());
+  }
+
+  private void setWorkTime(long workTimeNanos) {
+    Mockito.when(clock.nanoTime()).thenReturn(0L).thenReturn(workTimeNanos);
+  }
+
+  private void setExpectedAndActualSleepTime(long expectedDelayTimeNanos, long 
actualDelayTimeNanos) throws InterruptedException {
+    Mockito.when(clock.sleep(expectedDelayTimeNanos))
+        .thenReturn(expectedDelayTimeNanos - actualDelayTimeNanos);
+  }
+
+  private void verifySleepTime(long expectedDelayTimeNanos) throws 
InterruptedException {
+    Mockito.verify(clock).sleep(expectedDelayTimeNanos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala 
b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 05b4e5c..b5c212a 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -93,4 +93,20 @@ class TestUtil {
     }
     assertTrue(throwSamzaException)
   }
+
+  @Test
+  def testClampAdd() {
+    assertEquals(0, Util.clampAdd(0, 0))
+    assertEquals(2, Util.clampAdd(1, 1))
+    assertEquals(-2, Util.clampAdd(-1, -1))
+    assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, 0))
+    assertEquals(Long.MaxValue - 1, Util.clampAdd(Long.MaxValue, -1))
+    assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, 1))
+    assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, Long.MaxValue))
+    assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, 0))
+    assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, -1))
+    assertEquals(Long.MinValue + 1, Util.clampAdd(Long.MinValue, 1))
+    assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, Long.MinValue))
+    assertEquals(-1, Util.clampAdd(Long.MaxValue, Long.MinValue))
+  }
 }

Reply via email to