Repository: incubator-samza Updated Branches: refs/heads/master 38d659b33 -> e603a2794
SAMZA-349: add Timer in Metrics Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/e603a279 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/e603a279 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/e603a279 Branch: refs/heads/master Commit: e603a27947ed0940167c72ae4fbd8eacd8c7fcbf Parents: 38d659b Author: Yan Fang <[email protected]> Authored: Mon Aug 4 23:48:02 2014 -0700 Committer: Yan Fang <[email protected]> Committed: Mon Aug 4 23:48:02 2014 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../apache/samza/metrics/MetricsRegistry.java | 16 ++ .../apache/samza/metrics/MetricsVisitor.java | 4 + .../ReadableMetricsRegistryListener.java | 2 + .../org/apache/samza/metrics/Reservoir.java | 46 ++++++ .../metrics/SlidingTimeWindowReservoir.java | 150 +++++++++++++++++++ .../java/org/apache/samza/metrics/Snapshot.java | 96 ++++++++++++ .../java/org/apache/samza/metrics/Timer.java | 98 ++++++++++++ .../apache/samza/util/NoOpMetricsRegistry.java | 13 +- .../metrics/TestSlidingTimeWindowReservoir.java | 86 +++++++++++ .../org/apache/samza/metrics/TestSnapshot.java | 45 ++++++ .../org/apache/samza/metrics/TestTimer.java | 70 +++++++++ .../samza/util/TestNoOpMetricsRegistry.java | 11 +- .../apache/samza/metrics/MetricsHelper.scala | 6 +- .../samza/metrics/MetricsRegistryMap.scala | 13 ++ .../samza/metrics/reporter/JmxReporter.scala | 17 ++- .../reporter/MetricsSnapshotReporter.scala | 2 + .../webapp/ApplicationMasterRestServlet.scala | 3 + 18 files changed, 675 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 3ad5fe3..93ec947 100644 --- a/build.gradle +++ b/build.gradle @@ -102,6 +102,7 @@ project(':samza-api') { dependencies { testCompile "junit:junit:$junitVersion" + testCompile "org.mockito:mockito-all:$mockitoVersion" } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java index 1031e45..5a00d01 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java @@ -63,4 +63,20 @@ public interface MetricsRegistry { * @return Gauge was registered */ <T> Gauge<T> newGauge(String group, Gauge<T> value); + + /** + * Create and Register a new {@link org.apache.samza.metrics.Timer} + * @param group Group for this Timer + * @param name Name of to-be-created Timer + * @return New Timer instance + */ + Timer newTimer(String group, String name); + + /** + * Register existing {@link org.apache.samza.metrics.Timer} with this registry + * @param group Group for this Timer + * @param timer Existing Timer to register + * @return Timer that was registered + */ + Timer newTimer(String group, Timer timer); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java index f4f756a..75abfe7 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java @@ -29,11 +29,15 @@ public abstract class MetricsVisitor { public abstract <T> void gauge(Gauge<T> gauge); + public abstract void timer(Timer timer); + public void visit(Metric metric) { if (metric instanceof Counter) { counter((Counter) metric); } else if (metric instanceof Gauge<?>) { gauge((Gauge<?>) metric); + } else if (metric instanceof Timer) { + timer((Timer) metric); } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java index a16378f..739d68f 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java @@ -23,4 +23,6 @@ public interface ReadableMetricsRegistryListener { void onCounter(String group, Counter counter); void onGauge(String group, Gauge<?> gauge); + + void onTimer(String group, Timer timer); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java b/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java new file mode 100644 index 0000000..b45e433 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +/** + * A reservoir interface to store, update and display values + */ +public interface Reservoir { + /** + * Return the number of values in this reservoir + * + * @return the number of values; + */ + int size(); + + /** + * Update the reservoir with the new value + * + * @param new value + */ + void update(long value); + + /** + * Return a {@link Snapshot} of this reservoir + * + * @return a statistical snapshot of this reservoir + */ + Snapshot getSnapshot(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java new file mode 100644 index 0000000..df54359 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.samza.util.Clock; + +/** + * An implemented {@link Reservoir} used to store values that appear in a + * sliding time window + */ +public class SlidingTimeWindowReservoir implements Reservoir { + + /** + * Allow this amount of values to have the same updating time. + */ + private static final int TIME_COLLISION_BUFFER = 256; + + /** + * Run {@link #removeExpireValues} once every this amount of {@link #update}s + */ + private static final int REMOVE_IN_UPDATE_THRESHOLD = 256; + + /** + * default window size + */ + private static final int DEFAULT_WINDOW_SIZE_MS = 300000; + + /** + * Size of the window. The unit is millisecond. It is as + * <code>TIME_COLLISION_BUFFER</code> times big as the original window size. + */ + private final long windowMs; + + /** + * A concurrent map (value's updating time -> value) + */ + private final ConcurrentSkipListMap<Long, Long> storage; + + /** + * Total number of values updated in the reservoir. + */ + private final AtomicLong count; + + /** + * Updating time of the last value. + */ + private final AtomicLong lastUpdatingTime; + + private final Clock clock; + + /** + * Default constructor using default window size + */ + public SlidingTimeWindowReservoir() { + this(DEFAULT_WINDOW_SIZE_MS, new Clock() { + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + }); + } + + /** + * Construct the SlidingTimeWindowReservoir with window size + * + * @param windowMs the size of the window. unit is millisecond. + */ + public SlidingTimeWindowReservoir(long windowMs) { + this(windowMs, new Clock() { + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + }); + } + + public SlidingTimeWindowReservoir(long windowMs, Clock clock) { + this.windowMs = windowMs * TIME_COLLISION_BUFFER; + this.storage = new ConcurrentSkipListMap<Long, Long>(); + this.count = new AtomicLong(); + this.lastUpdatingTime = new AtomicLong(); + this.clock = clock; + } + + @Override + public int size() { + removeExpireValues(); + return storage.size(); + } + + @Override + public void update(long value) { + if (count.incrementAndGet() % REMOVE_IN_UPDATE_THRESHOLD == 0) { + removeExpireValues(); + } + storage.put(getUpdatingTime(), value); + } + + /** + * Remove the values that are earlier than current window + */ + private void removeExpireValues() { + storage.headMap(getUpdatingTime() - windowMs).clear(); + } + + /** + * Return the new updating time. If the new value's system time equals to last + * value's, use the last updating time + 1 as the new updating time. This + * operation guarantees all the updating times in the <code>storage</code> + * strictly increment. No override happens before reaching the + * <code>TIME_COLLISION_BUFFER</code>. + * + * @return the updating time + */ + private long getUpdatingTime() { + while (true) { + long oldTime = lastUpdatingTime.get(); + long newTime = clock.currentTimeMillis() * TIME_COLLISION_BUFFER; + long updatingTime = newTime > oldTime ? newTime : oldTime + 1; + // make sure no other threads modify the lastUpdatingTime + if (lastUpdatingTime.compareAndSet(oldTime, updatingTime)) { + return updatingTime; + } + } + } + + @Override + public Snapshot getSnapshot() { + removeExpireValues(); + return new Snapshot(storage.values()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java new file mode 100644 index 0000000..7666909 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A statistical snapshot of a collection of values + */ +public class Snapshot { + private final ArrayList<Long> values; + private final int size; + + public Snapshot(Collection<Long> values) { + this.values = new ArrayList<Long>(values); + this.size = values.size(); + Collections.sort(this.values); + } + + /** + * Get the maximum value in the collection + * + * @return maximum value + */ + public long getMax() { + if (size == 0) { + return 0; + } + return values.get(size - 1); + } + + /** + * Get the minimum value in the collection + * + * @return minimum value + */ + public long getMin() { + if (size == 0) { + return 0; + } + return values.get(0); + } + + /** + * Get the average of the values in the collection + * + * @return average value + */ + public double getAverage() { + if (size == 0) { + return 0; + } + double sum = 0; + for (long value : values) { + sum += value; + } + return sum / size; + } + + /** + * Get the number of values in the collection + * + * @return size of the collection + */ + public int getSize() { + return size; + } + + /** + * Return the entire list of values + * + * @return the list of values + */ + public ArrayList<Long> getValues() { + return (ArrayList<Long>) values.clone(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/Timer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java new file mode 100644 index 0000000..b49d147 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +import org.apache.samza.util.Clock; + +/** + * A timer metric that stores time duration and provides {@link Snapshot} of the + * durations. + */ +public class Timer implements Metric { + + private final String name; + private final Reservoir reservoir; + + /** + * Default constructor. It uses {@link SlidingTimeWindowReservoir} as the + * default reservoir. + * + * @param name name of this timer + */ + public Timer(String name) { + this(name, new SlidingTimeWindowReservoir()); + } + + /** + * Construct a {@link Timer} with given window size + * + * @param name name of this timer + * @param windowMs the window size. unit is millisecond + * @param clock the clock for the reservoir + */ + public Timer(String name, long windowMs, Clock clock) { + this(name, new SlidingTimeWindowReservoir(windowMs, clock)); + } + + /** + * Construct a {@link Timer} with given {@link Reservoir} + * + * @param name name of this timer + * @param reservoir the given reservoir + */ + public Timer(String name, Reservoir reservoir) { + this.name = name; + this.reservoir = reservoir; + } + + /** + * Add the time duration + * + * @param duration time duration + */ + public void update(long duration) { + if (duration > 0) { + reservoir.update(duration); + } + } + + /** + * Get the {@link Snapshot} + * + * @return a statistical snapshot + */ + public Snapshot getSnapshot() { + return reservoir.getSnapshot(); + } + + @Override + public void visit(MetricsVisitor visitor) { + visitor.timer(this); + } + + /** + * Get the name of the timer + * + * @return name of the timer + */ + public String getName() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java index d7bc4a9..3df855c 100644 --- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java +++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java @@ -22,6 +22,7 @@ package org.apache.samza.util; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.Timer; /** * {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be @@ -47,4 +48,14 @@ public class NoOpMetricsRegistry implements MetricsRegistry { public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) { return gauge; } -} + + @Override + public Timer newTimer(String group, String name) { + return new Timer(name); + } + + @Override + public Timer newTimer(String group, Timer timer) { + return timer; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java new file mode 100644 index 0000000..eb5043b --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +import static org.junit.Assert.*; + +import java.util.Arrays; + +import org.apache.samza.util.Clock; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +public class TestSlidingTimeWindowReservoir { + + private final Clock clock = mock(Clock.class); + + @Test + public void testUpdateSizeSnapshot() { + SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock); + + when(clock.currentTimeMillis()).thenReturn(0L); + slidingTimeWindowReservoir.update(1L); + + when(clock.currentTimeMillis()).thenReturn(1L); + slidingTimeWindowReservoir.update(2L); + + when(clock.currentTimeMillis()).thenReturn(2L); + slidingTimeWindowReservoir.update(3L); + + assertEquals(3, slidingTimeWindowReservoir.size()); + + Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot(); + assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L))); + assertTrue(snapshot.getSize() == 3); + } + + @Test + public void testDuplicateTime() { + SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock); + when(clock.currentTimeMillis()).thenReturn(0L); + slidingTimeWindowReservoir.update(1L); + slidingTimeWindowReservoir.update(2L); + + Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot(); + assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L))); + assertTrue(snapshot.getSize() == 2); + } + + @Test + public void testRemoveExpiredValues() { + SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock); + when(clock.currentTimeMillis()).thenReturn(0L); + slidingTimeWindowReservoir.update(1L); + + when(clock.currentTimeMillis()).thenReturn(100L); + slidingTimeWindowReservoir.update(2L); + + when(clock.currentTimeMillis()).thenReturn(301L); + slidingTimeWindowReservoir.update(3L); + + when(clock.currentTimeMillis()).thenReturn(500L); + slidingTimeWindowReservoir.update(4L); + + Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot(); + assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L))); + assertTrue(snapshot.getSize() == 2); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java new file mode 100644 index 0000000..b7aecb2 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Arrays; + +import org.junit.Test; + +public class TestSnapshot { + + @Test + public void testGetMaxMinAverageSize() { + Snapshot snapshot = new Snapshot(Arrays.asList(1L, 2L, 3L, 4L, 5L)); + assertEquals(5, snapshot.getMax()); + assertEquals(1, snapshot.getMin()); + assertEquals(3, snapshot.getAverage(), 0); + assertEquals(5, snapshot.getSize()); + + Snapshot emptySnapshot = new Snapshot(new ArrayList<Long>()); + assertEquals(0, emptySnapshot.getMax()); + assertEquals(0, emptySnapshot.getMin()); + assertEquals(0, emptySnapshot.getAverage(), 0); + assertEquals(0, emptySnapshot.getSize()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java new file mode 100644 index 0000000..dcc3cb8 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +import static org.junit.Assert.*; + +import java.util.Arrays; + +import org.apache.samza.util.Clock; +import org.junit.Test; + +public class TestTimer { + + // mock clock + private final Clock clock = new Clock() { + long value = 0; + + @Override + public long currentTimeMillis() { + return value += 100; + } + }; + + @Test + public void testDefaultTimerUpdateAndGetSnapshot() { + Timer timer = new Timer("test"); + timer.update(1L); + timer.update(2L); + + Snapshot snapshot = timer.getSnapshot(); + assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L))); + assertTrue(snapshot.getValues().size() == 2); + } + + @Test + public void testTimerWithDifferentWindowSize() { + Timer timer = new Timer("test", 300, clock); + timer.update(1L); + timer.update(2L); + timer.update(3L); + + Snapshot snapshot = timer.getSnapshot(); + assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L))); + assertTrue(snapshot.getValues().size() == 3); + + // the time is 500 for update(4L) because getSnapshot calls clock once + 3 + // updates that call clock 3 times + timer.update(4L); + Snapshot snapshot2 = timer.getSnapshot(); + assertTrue(snapshot2.getValues().containsAll(Arrays.asList(3L, 4L))); + assertTrue(snapshot2.getValues().size() == 2); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java index 78d2824..2d0034f 100644 --- a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java +++ b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java @@ -22,9 +22,9 @@ package org.apache.samza.util; import static org.junit.Assert.*; import org.junit.Test; - import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.Timer; public class TestNoOpMetricsRegistry { @Test @@ -37,6 +37,9 @@ public class TestNoOpMetricsRegistry { Gauge<String> gauge2 = registry.newGauge("testg", "b", "2"); Gauge<String> gauge3 = registry.newGauge("testg", "c", "3"); Gauge<String> gauge4 = registry.newGauge("testg2", "d", "4"); + Timer timer1 = registry.newTimer("testt", "a"); + Timer timer2 = registry.newTimer("testt", "b"); + Timer timer3 = registry.newTimer("testt2", "c"); counter1.inc(); counter2.inc(2); counter3.inc(4); @@ -44,6 +47,9 @@ public class TestNoOpMetricsRegistry { gauge2.set("6"); gauge3.set("7"); gauge4.set("8"); + timer1.update(1L); + timer2.update(2L); + timer3.update(3L); assertEquals(counter1.getCount(), 1); assertEquals(counter2.getCount(), 2); assertEquals(counter3.getCount(), 4); @@ -51,5 +57,8 @@ public class TestNoOpMetricsRegistry { assertEquals(gauge2.getValue(), "6"); assertEquals(gauge3.getValue(), "7"); assertEquals(gauge4.getValue(), "8"); + assertEquals(timer1.getSnapshot().getAverage(), 1, 0); + assertEquals(timer2.getSnapshot().getAverage(), 2, 0); + assertEquals(timer3.getSnapshot().getAverage(), 3, 0); } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala index 8043f37..e5d6b1e 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala @@ -23,7 +23,7 @@ import org.apache.samza.container.SamzaContainerMetrics /** * MetricsHelper is a little helper class to make it easy to register and - * manage counters and gauges. + * manage counters, gauges and timers. */ trait MetricsHelper { val group = this.getClass.getName @@ -48,6 +48,10 @@ trait MetricsHelper { }) } + def newTimer(name: String) = { + registry.newTimer(group, (getPrefix + name).toLowerCase) + } + /** * Returns a prefix for metric names. */ http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala index da83f20..aac241b 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala @@ -62,6 +62,19 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with newGauge(group, new Gauge[T](name, value)) } + def newTimer(group: String, timer: Timer) = { + debug("Add new timer %s %s %s." format (group, timer.getName, timer)) + putAndGetGroup(group).putIfAbsent(timer.getName, timer) + val realTimer = metrics.get(group).get(timer.getName).asInstanceOf[Timer] + listeners.foreach(_.onTimer(group, realTimer)) + realTimer + } + + def newTimer(group: String, name: String) = { + debug("Creating new timer %s %s." format (group, name)) + newTimer(group, new Timer(name)) + } + private def putAndGetGroup(group: String) = { metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric]) metrics.get(group) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala index 8814e68..d66efc2 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala @@ -26,6 +26,7 @@ import javax.management.ObjectName import org.apache.samza.config.Config import org.apache.samza.metrics.Counter import org.apache.samza.metrics.Gauge +import org.apache.samza.metrics.Timer import org.apache.samza.metrics.MetricsReporter import org.apache.samza.metrics.MetricsReporterFactory import org.apache.samza.metrics.ReadableMetricsRegistry @@ -47,8 +48,9 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { registry.getGroup(group).foreach { case (name, metric) => metric.visit(new MetricsVisitor { - def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry)))); + def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry)))) def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry)))) + def timer(timer: Timer) = registerBean(new JmxTimer(timer, getObjectName(group, name, sources(registry)))) }) } }) @@ -66,6 +68,10 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { def onGauge(group: String, gauge: Gauge[_]) { registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, gauge.getName, source))) } + + def onTimer(group: String, timer: Timer) { + registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, source))) + } } } else { warn("Trying to re-register a registry for source %s. Ignoring." format source) @@ -137,6 +143,15 @@ class JmxCounter(c: org.apache.samza.metrics.Counter, on: ObjectName) extends Jm def objectName = on } +trait JmxTimerMBean extends MetricMBean { + def getAverageTime(): Double +} + +class JmxTimer(t: org.apache.samza.metrics.Timer, on: ObjectName) extends JmxTimerMBean { + def getAverageTime() = t.getSnapshot().getAverage() + def objectName = on +} + class JmxReporterFactory extends MetricsReporterFactory with Logging { def getMetricsReporter(name: String, containerName: String, config: Config) = { info("Creating JMX reporter with name %s." format name) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala index 9a56754..319c74d 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConversions._ import grizzled.slf4j.Logging import org.apache.samza.metrics.Counter import org.apache.samza.metrics.Gauge +import org.apache.samza.metrics.Timer import org.apache.samza.metrics.MetricsReporter import org.apache.samza.metrics.MetricsVisitor import org.apache.samza.metrics.ReadableMetricsRegistry @@ -122,6 +123,7 @@ class MetricsSnapshotReporter( metric.visit(new MetricsVisitor { def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long) def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object]) + def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double) }) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala index d10dc38..27fbe2d 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala @@ -54,6 +54,9 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, r def gauge[T](gauge: Gauge[T]) = groupMap.put(gauge.getName, gauge.getValue.asInstanceOf[java.lang.Object]) + + def timer(timer: Timer) = + groupMap.put(timer.getName, timer.getSnapshot().getAverage: java.lang.Double) }) }
