SAMZA-834: fix perf degradation due to frequent update of Timer metrics
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/adf4f39a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/adf4f39a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/adf4f39a Branch: refs/heads/samza-sql Commit: adf4f39af6666aeee1e8bc7f2c9ba6d208c6fa6e Parents: d859378 Author: Xinyu Liu <[email protected]> Authored: Tue Dec 8 14:56:54 2015 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Dec 8 14:56:54 2015 -0800 ---------------------------------------------------------------------- .../metrics/SlidingTimeWindowReservoir.java | 26 +++++++++++++++----- .../java/org/apache/samza/metrics/Timer.java | 12 +++++++++ .../metrics/TestSlidingTimeWindowReservoir.java | 15 ++++++++--- .../org/apache/samza/metrics/TestTimer.java | 2 +- .../apache/samza/container/TestRunLoop.scala | 18 +++++++++++++- 5 files changed, 61 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java index df54359..4116473 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java @@ -31,9 +31,9 @@ import org.apache.samza.util.Clock; public class SlidingTimeWindowReservoir implements Reservoir { /** - * Allow this amount of values to have the same updating time. + * default collision buffer */ - private static final int TIME_COLLISION_BUFFER = 256; + private static final int DEFAULT_TIME_COLLISION_BUFFER = 1; /** * Run {@link #removeExpireValues} once every this amount of {@link #update}s @@ -46,8 +46,13 @@ public class SlidingTimeWindowReservoir implements Reservoir { private static final int DEFAULT_WINDOW_SIZE_MS = 300000; /** + * Allow this amount of values to have the same updating time. + */ + private final int collisionBuffer; + + /** * Size of the window. The unit is millisecond. It is as - * <code>TIME_COLLISION_BUFFER</code> times big as the original window size. + * <code>collisionBuffer</code> times big as the original window size. */ private final long windowMs; @@ -93,11 +98,16 @@ public class SlidingTimeWindowReservoir implements Reservoir { } public SlidingTimeWindowReservoir(long windowMs, Clock clock) { - this.windowMs = windowMs * TIME_COLLISION_BUFFER; + this(windowMs, DEFAULT_TIME_COLLISION_BUFFER, clock); + } + + public SlidingTimeWindowReservoir(long windowMs, int collisionBuffer, Clock clock) { + this.windowMs = windowMs * collisionBuffer; this.storage = new ConcurrentSkipListMap<Long, Long>(); this.count = new AtomicLong(); this.lastUpdatingTime = new AtomicLong(); this.clock = clock; + this.collisionBuffer = collisionBuffer; } @Override @@ -126,15 +136,19 @@ public class SlidingTimeWindowReservoir implements Reservoir { * value's, use the last updating time + 1 as the new updating time. This * operation guarantees all the updating times in the <code>storage</code> * strictly increment. No override happens before reaching the - * <code>TIME_COLLISION_BUFFER</code>. + * <code>collisionBuffer</code>. * * @return the updating time */ private long getUpdatingTime() { while (true) { long oldTime = lastUpdatingTime.get(); - long newTime = clock.currentTimeMillis() * TIME_COLLISION_BUFFER; + long newTime = clock.currentTimeMillis() * collisionBuffer; long updatingTime = newTime > oldTime ? newTime : oldTime + 1; + // make sure the updateTime doesn't overflow to the next millisecond + if (updatingTime == newTime + collisionBuffer) { + --updatingTime; + } // make sure no other threads modify the lastUpdatingTime if (lastUpdatingTime.compareAndSet(oldTime, updatingTime)) { return updatingTime; http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java index b49d147..96715e8 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java @@ -52,6 +52,18 @@ public class Timer implements Metric { } /** + * Construct a {@link Timer} with given window size and collision buffer + * + * @param name name of this timer + * @param windowMs the window size. unit is millisecond + * @param collisionBuffer amount of collisions allowed in one millisecond. + * @param clock the clock for the reservoir + */ + public Timer(String name, long windowMs, int collisionBuffer, Clock clock) { + this(name, new SlidingTimeWindowReservoir(windowMs, collisionBuffer, clock)); + } + + /** * Construct a {@link Timer} with given {@link Reservoir} * * @param name name of this timer http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java index d392b32..aca0f9d 100644 --- a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java @@ -34,7 +34,7 @@ public class TestSlidingTimeWindowReservoir { @Test public void testUpdateSizeSnapshot() { SlidingTimeWindowReservoir slidingTimeWindowReservoir = - new SlidingTimeWindowReservoir(300, clock); + new SlidingTimeWindowReservoir(300, 8, clock); when(clock.currentTimeMillis()).thenReturn(0L); slidingTimeWindowReservoir.update(1L); @@ -55,20 +55,26 @@ public class TestSlidingTimeWindowReservoir { @Test public void testDuplicateTime() { SlidingTimeWindowReservoir slidingTimeWindowReservoir = - new SlidingTimeWindowReservoir(300, clock); - when(clock.currentTimeMillis()).thenReturn(0L); + new SlidingTimeWindowReservoir(300, 2, clock); + when(clock.currentTimeMillis()).thenReturn(1L); slidingTimeWindowReservoir.update(1L); slidingTimeWindowReservoir.update(2L); Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot(); assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L))); assertEquals(2, snapshot.getSize()); + + // update causes collision, will override the last update + slidingTimeWindowReservoir.update(3L); + snapshot = slidingTimeWindowReservoir.getSnapshot(); + assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 3L))); + assertEquals(2, snapshot.getSize()); } @Test public void testRemoveExpiredValues() { SlidingTimeWindowReservoir slidingTimeWindowReservoir = - new SlidingTimeWindowReservoir(300, clock); + new SlidingTimeWindowReservoir(300, 8, clock); when(clock.currentTimeMillis()).thenReturn(0L); slidingTimeWindowReservoir.update(1L); @@ -85,4 +91,5 @@ public class TestSlidingTimeWindowReservoir { assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L))); assertEquals(2, snapshot.getSize()); } + } http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java index 63c183f..8076e02 100644 --- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java @@ -42,7 +42,7 @@ public class TestTimer { @Test public void testDefaultTimerUpdateAndGetSnapshot() { - Timer timer = new Timer("test"); + Timer timer = new Timer("test", 300, clock); timer.update(1L); timer.update(2L); http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala index b4d6f35..ad37447 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala @@ -19,10 +19,14 @@ package org.apache.samza.container + +import org.apache.samza.metrics.{Timer, SlidingTimeWindowReservoir, MetricsRegistryMap} +import org.apache.samza.util.Clock import org.junit.Test import org.junit.Assert._ import org.mockito.Matchers import org.mockito.Mockito._ +import org.mockito.internal.util.reflection.Whitebox import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.junit.AssertionsForJUnit @@ -183,7 +187,19 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat var now = 0L val consumers = mock[SystemConsumers] when(consumers.choose).thenReturn(envelope0) - val testMetrics = new SamzaContainerMetrics + val clock = new Clock { + var c = 0L + def currentTimeMillis: Long = { + c += 1L + c + } + } + val testMetrics = new SamzaContainerMetrics("test", new MetricsRegistryMap() { + override def newTimer(group: String, name: String) = { + newTimer(group, new Timer(name, new SlidingTimeWindowReservoir(300000, clock))) + } + }) + val runLoop = new RunLoop( taskInstances = getMockTaskInstances, consumerMultiplexer = consumers,
