Repository: samza Updated Branches: refs/heads/master 7fc39e521 -> f6a2d54da
Add a new ListGauge metric-type This PR introduces a ListGauge type, A subsequent PR: https://github.com/apache/samza/pull/543 shows how this issued in conjunction with a diagnostics appender for error-tracking and dumpting to kafka via SnapshotReporter. Author: Ray Matharu <rmath...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org>, Cameron Lee <ca...@linkedin.com> Closes #541 from rayman7718/listgauge Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f6a2d54d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f6a2d54d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f6a2d54d Branch: refs/heads/master Commit: f6a2d54dab51915aeb99d62b7c59523d543e42b7 Parents: 7fc39e5 Author: Ray Matharu <rmath...@linkedin.com> Authored: Wed Jun 13 21:01:53 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Wed Jun 13 21:01:53 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/metrics/ListGauge.java | 143 +++++++++++++++++++ .../apache/samza/metrics/MetricsRegistry.java | 9 ++ .../apache/samza/metrics/MetricsVisitor.java | 7 +- .../ReadableMetricsRegistryListener.java | 2 + .../apache/samza/util/NoOpMetricsRegistry.java | 7 + .../org/apache/samza/util/TimestampedValue.java | 61 ++++++++ .../org/apache/samza/metrics/TestListGauge.java | 99 +++++++++++++ .../org/apache/samza/metrics/TestTimer.java | 5 +- .../system/eventhub/TestMetricsRegistry.java | 16 ++- .../org/apache/samza/metrics/MetricGroup.java | 4 + .../functions/PartialJoinFunction.java | 2 +- .../samza/operators/impl/OperatorImplGraph.java | 2 +- .../operators/impl/PartialJoinOperatorImpl.java | 2 +- .../operators/impl/WindowOperatorImpl.java | 2 +- .../operators/impl/store/TimeSeriesStore.java | 2 + .../impl/store/TimeSeriesStoreImpl.java | 1 + .../operators/impl/store/TimestampedValue.java | 61 -------- .../impl/store/TimestampedValueSerde.java | 1 + .../samza/operators/spec/JoinOperatorSpec.java | 2 +- .../apache/samza/container/SamzaContainer.scala | 1 + .../samza/container/SamzaContainerMetrics.scala | 2 + .../apache/samza/metrics/MetricsHelper.scala | 6 +- .../samza/metrics/MetricsRegistryMap.scala | 15 ++ .../samza/metrics/reporter/JmxReporter.scala | 24 ++-- .../reporter/MetricsSnapshotReporter.scala | 24 +++- .../operators/impl/TestOperatorImplGraph.java | 2 +- .../impl/store/TestTimeSeriesStoreImpl.java | 1 + .../impl/store/TestTimestampedValueSerde.java | 1 + .../webapp/ApplicationMasterRestServlet.scala | 3 + 29 files changed, 413 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java b/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java new file mode 100644 index 0000000..545fd45 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.metrics; + +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; +import org.apache.samza.util.TimestampedValue; + + +/** + * A {@link ListGauge} is a {@link Metric} that buffers multiple instances of a type T in a list. + * {@link ListGauge}s are useful for maintaining, recording, or collecting values over time. + * For example, a set of specific logging-events (e.g., errors). + * + * Eviction is controlled by parameters (maxNumberOfItems and maxStaleness), which are set during instantiation. + * Eviction happens during element addition or during reads of the ListGauge (getValues). + * + * All public methods are thread-safe. + * + */ +public class ListGauge<T> implements Metric { + private final String name; + private final Queue<TimestampedValue<T>> elements; + + private final int maxNumberOfItems; + private final Duration maxStaleness; + private final static int DEFAULT_MAX_NITEMS = 1000; + private final static Duration DEFAULT_MAX_STALENESS = Duration.ofMinutes(60); + + /** + * Create a new {@link ListGauge} that auto evicts based on the given maxNumberOfItems, maxStaleness, and period parameters. + * + * @param name Name to be assigned + * @param maxNumberOfItems The max number of items that can remain in the listgauge + * @param maxStaleness The max staleness of items permitted in the listgauge + */ + public ListGauge(String name, int maxNumberOfItems, Duration maxStaleness) { + this.name = name; + this.elements = new ConcurrentLinkedQueue<TimestampedValue<T>>(); + this.maxNumberOfItems = maxNumberOfItems; + this.maxStaleness = maxStaleness; + } + + /** + * Create a new {@link ListGauge} that auto evicts upto a max of 100 items and a max-staleness of 60 minutes. + * @param name Name to be assigned + */ + public ListGauge(String name) { + this(name, DEFAULT_MAX_NITEMS, DEFAULT_MAX_STALENESS); + } + + /** + * Get the name assigned to this {@link ListGauge} + * @return the assigned name + */ + public String getName() { + return this.name; + } + + /** + * Get the Collection of values currently in the list. + * @return the collection of values + */ + public Collection<T> getValues() { + this.evict(); + return Collections.unmodifiableList(this.elements.stream().map(x -> x.getValue()).collect(Collectors.toList())); + } + + /** + * Add a value to the list. + * (Timestamp assigned to this value is the current timestamp.) + * @param value The Gauge value to be added + */ + public void add(T value) { + this.elements.add(new TimestampedValue<T>(value, Instant.now().toEpochMilli())); + + // perform any evictions that may be needed. + this.evict(); + } + + /** + * {@inheritDoc} + */ + @Override + public void visit(MetricsVisitor visitor) { + visitor.listGauge(this); + } + + /** + * Evicts entries from the elements list, based on the given item-size and durationThreshold. + */ + private void evict() { + this.evictBasedOnSize(); + this.evictBasedOnTimestamp(); + } + + /** + * Evicts entries from elements in FIFO order until it has maxNumberOfItems + */ + private void evictBasedOnSize() { + int numToEvict = this.elements.size() - this.maxNumberOfItems; + while (numToEvict > 0) { + this.elements.poll(); // remove head + numToEvict--; + } + } + + /** + * Removes entries from elements to ensure no element has a timestamp more than maxStaleness before current timestamp. + */ + private void evictBasedOnTimestamp() { + Instant currentTimestamp = Instant.now(); + TimestampedValue<T> valueInfo = this.elements.peek(); + + // continue remove-head if currenttimestamp - head-element's timestamp > durationThreshold + while (valueInfo != null + && currentTimestamp.toEpochMilli() - valueInfo.getTimestamp() > this.maxStaleness.toMillis()) { + this.elements.poll(); + valueInfo = this.elements.peek(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java index 5a00d01..fa0fd39 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java @@ -65,6 +65,15 @@ public interface MetricsRegistry { <T> Gauge<T> newGauge(String group, Gauge<T> value); /** + * Register a {@link org.apache.samza.metrics.ListGauge} + * @param group Group for this ListGauge + * @param listGauge the ListGauge to register + * @param <T> Type of the ListGauge + * @return ListGauge registered + */ + <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge); + + /** * Create and Register a new {@link org.apache.samza.metrics.Timer} * @param group Group for this Timer * @param name Name of to-be-created Timer http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java index 75abfe7..49a0929 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java @@ -31,8 +31,13 @@ public abstract class MetricsVisitor { public abstract void timer(Timer timer); + public abstract <T> void listGauge(ListGauge<T> listGauge); + public void visit(Metric metric) { - if (metric instanceof Counter) { + // Cast for metrics of type ListGauge + if (metric instanceof ListGauge<?>) { + listGauge((ListGauge<?>) metric); + } else if (metric instanceof Counter) { counter((Counter) metric); } else if (metric instanceof Gauge<?>) { gauge((Gauge<?>) metric); http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java index 739d68f..ba5b182 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java @@ -24,5 +24,7 @@ public interface ReadableMetricsRegistryListener { void onGauge(String group, Gauge<?> gauge); + void onListGauge(String group, ListGauge<?> listGauge); + void onTimer(String group, Timer timer); } http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java index 3df855c..76b8216 100644 --- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java +++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java @@ -21,9 +21,11 @@ package org.apache.samza.util; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.ListGauge; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; + /** * {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be * recorded but a registry is still required. @@ -50,6 +52,11 @@ public class NoOpMetricsRegistry implements MetricsRegistry { } @Override + public <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge) { + return listGauge; + } + + @Override public Timer newTimer(String group, String name) { return new Timer(name); } http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java b/samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java new file mode 100644 index 0000000..e767918 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.samza.util; + +/** + * An immutable pair of a value, and its corresponding timestamp. + * + * @param <V> the type of the value + */ +public class TimestampedValue<V> { + private final V value; + private final long timestamp; + + public TimestampedValue(V v, long timestamp) { + this.value = v; + this.timestamp = timestamp; + } + + public V getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || !getClass().equals(o.getClass())) return false; + + TimestampedValue<?> that = (TimestampedValue<?>) o; + + if (timestamp != that.timestamp) return false; + return value != null ? value.equals(that.value) : (that.value == null); + } + + @Override + public int hashCode() { + int result = value != null ? value.hashCode() : 0; + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java b/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java new file mode 100644 index 0000000..eb91012 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.metrics; + +import java.time.Duration; +import java.util.Collection; +import java.util.Iterator; +import org.junit.Assert; +import org.junit.Test; + + +/** + * Class to encapsulate test-cases for {@link org.apache.samza.metrics.ListGauge} + */ +public class TestListGauge { + + private final static Duration THREAD_TEST_TIMEOUT = Duration.ofSeconds(10); + + private <T> ListGauge<T> getListGaugeForTest() { + return new ListGauge<T>("sampleListGauge", 10, Duration.ofSeconds(60)); + } + + @Test + public void basicTest() { + ListGauge<String> listGauge = getListGaugeForTest(); + listGauge.add("sampleValue"); + Assert.assertEquals("Names should be the same", listGauge.getName(), "sampleListGauge"); + Assert.assertEquals("List sizes should match", listGauge.getValues().size(), 1); + Assert.assertEquals("ListGauge should contain sampleGauge", listGauge.getValues().contains("sampleValue"), true); + } + + @Test + public void testSizeEnforcement() { + ListGauge listGauge = getListGaugeForTest(); + for (int i = 15; i > 0; i--) { + listGauge.add("v" + i); + } + Assert.assertEquals("List sizes should be as configured at creation time", listGauge.getValues().size(), 10); + + int valueIndex = 10; + Collection<String> currentList = listGauge.getValues(); + Iterator iterator = currentList.iterator(); + while (iterator.hasNext()) { + String gaugeValue = (String) iterator.next(); + Assert.assertTrue(gaugeValue.equals("v" + valueIndex)); + valueIndex--; + } + } + + @Test + public void testThreadSafety() throws InterruptedException { + ListGauge<Integer> listGauge = getListGaugeForTest(); + + Thread thread1 = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 1; i <= 100; i++) { + listGauge.add(i); + } + } + }); + + Thread thread2 = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 1; i <= 100; i++) { + listGauge.add(i); + } + } + }); + + thread1.start(); + thread2.start(); + + thread1.join(THREAD_TEST_TIMEOUT.toMillis()); + thread2.join(THREAD_TEST_TIMEOUT.toMillis()); + + Assert.assertTrue("ListGauge should have the last 10 values", listGauge.getValues().size() == 10); + for (Integer gaugeValue : listGauge.getValues()) { + Assert.assertTrue("Values should have the last 10 range", gaugeValue <= 100 && gaugeValue > 90); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java index 8076e02..c694d3f 100644 --- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java @@ -19,13 +19,12 @@ package org.apache.samza.metrics; -import static org.junit.Assert.*; - import java.util.Arrays; - import org.apache.samza.util.Clock; import org.junit.Test; +import static org.junit.Assert.*; + public class TestTimer { /* http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java index d29b975..01b69ed 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java @@ -19,20 +19,21 @@ package org.apache.samza.system.eventhub; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import org.apache.commons.collections4.map.HashedMap; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.ListGauge; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - public class TestMetricsRegistry implements MetricsRegistry { private Map<String, List<Counter>> counters = new HashedMap<>(); private Map<String, List<Gauge<?>>> gauges = new HashedMap<>(); + private Map<String, List<ListGauge>> listGauges = new HashedMap<>(); public List<Counter> getCounters(String groupName) { return counters.get(groupName); @@ -69,6 +70,13 @@ public class TestMetricsRegistry implements MetricsRegistry { } @Override + public ListGauge newListGauge(String group, ListGauge listGauge) { + listGauges.putIfAbsent(group, new ArrayList()); + listGauges.get(group).add(listGauge); + return listGauge; + } + + @Override public <T> Gauge<T> newGauge(String group, Gauge<T> value) { if (!gauges.containsKey(group)) { gauges.put(group, new ArrayList<>()); http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java index 53526d8..fc57846 100644 --- a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java +++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java @@ -44,6 +44,10 @@ public class MetricGroup { return registry.newCounter(groupName, (prefix + name).toLowerCase()); } + public <T> ListGauge<T> newListGauge(String name) { + return registry.newListGauge(groupName, new ListGauge(name)); + } + public <T> Gauge<T> newGauge(String name, T value) { return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), value)); } http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java index 5ede5e8..038abba 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java +++ b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java @@ -18,7 +18,7 @@ */ package org.apache.samza.operators.functions; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.storage.kv.KeyValueStore; /** http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 0f51798..df73e48 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -28,7 +28,7 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.BroadcastOperatorSpec; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.JoinOperatorSpec; http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index 90a71a0..0cdde49 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -21,7 +21,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.storage.kv.KeyValueStore; http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index 6b5baae..82dc0bf 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -28,7 +28,7 @@ import org.apache.samza.operators.functions.SupplierFunction; import org.apache.samza.operators.impl.store.TimeSeriesKey; import org.apache.samza.operators.impl.store.TimeSeriesStore; import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.triggers.FiringType; http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java index f3d6948..c9b694d 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java @@ -21,6 +21,8 @@ package org.apache.samza.operators.impl.store; import org.apache.samza.storage.kv.ClosableIterator; +import org.apache.samza.util.TimestampedValue; + /** * A key-value store that allows entries to be queried and stored based on time ranges. http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java index f03d396..10a5967 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java @@ -23,6 +23,7 @@ import org.apache.samza.storage.kv.ClosableIterator; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueIterator; import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.util.TimestampedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java deleted file mode 100644 index 5e45148..0000000 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.samza.operators.impl.store; - -/** - * An immutable pair of a value, and its corresponding timestamp. - * - * @param <V> the type of the value - */ -public class TimestampedValue<V> { - private final V value; - private final long timestamp; - - public TimestampedValue(V v, long timestamp) { - this.value = v; - this.timestamp = timestamp; - } - - public V getValue() { - return value; - } - - public long getTimestamp() { - return timestamp; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || !getClass().equals(o.getClass())) return false; - - TimestampedValue<?> that = (TimestampedValue<?>) o; - - if (timestamp != that.timestamp) return false; - return value != null ? value.equals(that.value) : (that.value == null); - } - - @Override - public int hashCode() { - int result = value != null ? value.hashCode() : 0; - result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); - return result; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java index b14f8a4..5b0cdac 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java @@ -21,6 +21,7 @@ package org.apache.samza.operators.impl.store; import org.apache.samza.serializers.Serde; import java.nio.ByteBuffer; +import org.apache.samza.util.TimestampedValue; public class TimestampedValueSerde<V> implements Serde<TimestampedValue<V>> { http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java index a218135..1b55784 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java @@ -23,7 +23,7 @@ import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.impl.store.TimestampedValueSerde; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.serializers.Serde; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 38d0d9c..be0fb26 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -30,6 +30,7 @@ import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorServic import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index c122956..a26e666 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -48,6 +48,8 @@ class SamzaContainerMetrics( val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]() + val exceptions = newListGauge[String]("exceptions") + def addStoreRestorationGauge(taskName: TaskName, storeName: String) { taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L)) } http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala index 1520b0e..21ec763 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala @@ -40,9 +40,11 @@ trait MetricsHelper { def newGauge[T](name: String, value: T) = metricGroup.newGauge[T](name,value) + def newListGauge[T](name: String) = metricGroup.newListGauge[T](name) + /** - * Specify a dynamic gauge that always returns the latest value when polled. - * The value closure must be thread safe, since metrics reporters may access + * Specify a dynamic gauge that always returns the latest value when polled. + * The value closure must be thread safe, since metrics reporters may access * it from another thread. */ def newGauge[T](name: String, value: () => T) = { http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala index 40ffee2..75ed6aa 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala @@ -75,6 +75,21 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with newTimer(group, new Timer(name)) } + /** + * Register a {@link org.apache.samza.metrics.ListGauge} + * + * @param group Group for this ListGauge + * @param listGauge the ListGauge to register + * @tparam T the type of the list gauge + */ + def newListGauge[T](group: String, listGauge: ListGauge[T]) = { + debug("Adding new listgauge %s %s %s." format(group, listGauge.getName, listGauge)) + putAndGetGroup(group).putIfAbsent(listGauge.getName, listGauge) + val realListGauge = metrics.get(group).get(listGauge.getName).asInstanceOf[ListGauge[T]] + listeners.foreach(_.onListGauge(group, realListGauge)) + realListGauge + } + private def putAndGetGroup(group: String) = { metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric]) metrics.get(group) http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala index 7da8a9c..c601b29 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala @@ -20,19 +20,15 @@ package org.apache.samza.metrics.reporter import java.lang.management.ManagementFactory + import org.apache.samza.util.Logging import javax.management.MBeanServer import javax.management.ObjectName + import org.apache.samza.config.Config -import org.apache.samza.metrics.Counter -import org.apache.samza.metrics.Gauge -import org.apache.samza.metrics.Timer -import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.metrics.MetricsReporterFactory -import org.apache.samza.metrics.ReadableMetricsRegistry -import org.apache.samza.metrics.ReadableMetricsRegistryListener +import org.apache.samza.metrics._ + import scala.collection.JavaConverters._ -import org.apache.samza.metrics.MetricsVisitor import org.apache.samza.metrics.JmxUtil._ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { @@ -52,6 +48,8 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry)))) def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry)))) def timer(timer: Timer) = registerBean(new JmxTimer(timer, getObjectName(group, name, sources(registry)))) + def listGauge[T](listGauge: ListGauge[T]) = registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, name, sources(registry)))) + }) } }) @@ -65,14 +63,15 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { def onCounter(group: String, counter: Counter) { registerBean(new JmxCounter(counter, getObjectName(group, counter.getName, source))) } - def onGauge(group: String, gauge: Gauge[_]) { registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, gauge.getName, source))) } - def onTimer(group: String, timer: Timer) { registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, source))) } + def onListGauge(group: String, listGauge: ListGauge[_]) { + registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, listGauge.getName, source))) + } } } else { warn("Trying to re-register a registry for source %s. Ignoring." format source) @@ -110,6 +109,11 @@ class JmxGauge(g: org.apache.samza.metrics.Gauge[Object], on: ObjectName) extend def objectName = on } +class JmxListGauge(g: org.apache.samza.metrics.ListGauge[Object], on: ObjectName) extends JmxGaugeMBean { + def getValue = g.getValues + def objectName = on +} + trait JmxCounterMBean extends MetricMBean { def getCount(): Long } http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala index 65ca49c..d300e90 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala @@ -26,7 +26,6 @@ import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.system.SystemProducer import org.apache.samza.system.SystemStream import org.apache.samza.util.Logging - import java.util.HashMap import java.util.Map import java.util.concurrent.Executors @@ -83,15 +82,18 @@ class MetricsSnapshotReporter( } def stop = { - info("Stopping producer.") - producer.stop + // Scheduling an event with 0 delay to ensure flushing of metrics one last time before shutdown + executor.schedule(this,0, TimeUnit.SECONDS) info("Stopping reporter timer.") - + // Allow the scheduled task above to finish, and block for termination (for max 60 seconds) executor.shutdown executor.awaitTermination(60, TimeUnit.SECONDS) + info("Stopping producer.") + producer.stop + if (!executor.isTerminated) { warn("Unable to shutdown reporter timer.") } @@ -112,6 +114,8 @@ class MetricsSnapshotReporter( registry.getGroup(group).asScala.foreach { case (name, metric) => metric.visit(new MetricsVisitor { + // for listGauge the value is returned as a list, which gets serialized + def listGauge[T](listGauge: ListGauge[T]) = {groupMsg.put(name, listGauge.getValues) } def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long) def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object]) def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double) @@ -133,12 +137,18 @@ class MetricsSnapshotReporter( metricsSnapshot } - producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized)) + try { + + producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized)) - // Always flush, since we don't want metrics to get batched up. - producer.flush(source) + // Always flush, since we don't want metrics to get batched up. + producer.flush(source) + } catch { + case e: Exception => error("Exception when flushing metrics for source %s " format(source), e) + } } + debug("Finished flushing metrics.") } } http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index b87e5ed..6fdcacc 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -52,7 +52,7 @@ import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.InitableFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.IntegerSerde; http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java index 0315a20..94e171a 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java @@ -24,6 +24,7 @@ import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.ClosableIterator; import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.util.TimestampedValue; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java index 40015ec..1621e73 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.impl.store; import org.apache.samza.serializers.ByteSerde; import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.util.TimestampedValue; import org.junit.Test; import java.nio.ByteBuffer; http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala index 5c10987..c93591a 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala @@ -46,6 +46,9 @@ object ApplicationMasterRestServlet { metricsRegistry.getGroup(group).asScala.foreach { case (name, metric) => metric.visit(new MetricsVisitor() { + def listGauge[T](listGauge: ListGauge[T]) = + groupMap.put(name, listGauge.getValues) + def counter(counter: Counter) = groupMap.put(counter.getName, counter.getCount: lang.Long)