Repository: samza
Updated Branches:
  refs/heads/master 7fc39e521 -> f6a2d54da


Add a new ListGauge metric-type

This PR introduces a ListGauge type,
A subsequent PR: https://github.com/apache/samza/pull/543 shows how this issued 
in conjunction with a diagnostics appender for error-tracking and dumpting to 
kafka via SnapshotReporter.

Author: Ray Matharu <rmath...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>, Cameron Lee <ca...@linkedin.com>

Closes #541 from rayman7718/listgauge


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f6a2d54d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f6a2d54d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f6a2d54d

Branch: refs/heads/master
Commit: f6a2d54dab51915aeb99d62b7c59523d543e42b7
Parents: 7fc39e5
Author: Ray Matharu <rmath...@linkedin.com>
Authored: Wed Jun 13 21:01:53 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Wed Jun 13 21:01:53 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/metrics/ListGauge.java     | 143 +++++++++++++++++++
 .../apache/samza/metrics/MetricsRegistry.java   |   9 ++
 .../apache/samza/metrics/MetricsVisitor.java    |   7 +-
 .../ReadableMetricsRegistryListener.java        |   2 +
 .../apache/samza/util/NoOpMetricsRegistry.java  |   7 +
 .../org/apache/samza/util/TimestampedValue.java |  61 ++++++++
 .../org/apache/samza/metrics/TestListGauge.java |  99 +++++++++++++
 .../org/apache/samza/metrics/TestTimer.java     |   5 +-
 .../system/eventhub/TestMetricsRegistry.java    |  16 ++-
 .../org/apache/samza/metrics/MetricGroup.java   |   4 +
 .../functions/PartialJoinFunction.java          |   2 +-
 .../samza/operators/impl/OperatorImplGraph.java |   2 +-
 .../operators/impl/PartialJoinOperatorImpl.java |   2 +-
 .../operators/impl/WindowOperatorImpl.java      |   2 +-
 .../operators/impl/store/TimeSeriesStore.java   |   2 +
 .../impl/store/TimeSeriesStoreImpl.java         |   1 +
 .../operators/impl/store/TimestampedValue.java  |  61 --------
 .../impl/store/TimestampedValueSerde.java       |   1 +
 .../samza/operators/spec/JoinOperatorSpec.java  |   2 +-
 .../apache/samza/container/SamzaContainer.scala |   1 +
 .../samza/container/SamzaContainerMetrics.scala |   2 +
 .../apache/samza/metrics/MetricsHelper.scala    |   6 +-
 .../samza/metrics/MetricsRegistryMap.scala      |  15 ++
 .../samza/metrics/reporter/JmxReporter.scala    |  24 ++--
 .../reporter/MetricsSnapshotReporter.scala      |  24 +++-
 .../operators/impl/TestOperatorImplGraph.java   |   2 +-
 .../impl/store/TestTimeSeriesStoreImpl.java     |   1 +
 .../impl/store/TestTimestampedValueSerde.java   |   1 +
 .../webapp/ApplicationMasterRestServlet.scala   |   3 +
 29 files changed, 413 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java 
b/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
new file mode 100644
index 0000000..545fd45
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+import org.apache.samza.util.TimestampedValue;
+
+
+/**
+ * A {@link ListGauge} is a {@link Metric} that buffers multiple instances of 
a type T in a list.
+ * {@link ListGauge}s are useful for maintaining, recording, or collecting 
values over time.
+ * For example, a set of specific logging-events (e.g., errors).
+ *
+ * Eviction is controlled by parameters (maxNumberOfItems and maxStaleness), 
which are set during instantiation.
+ * Eviction happens during element addition or during reads of the ListGauge 
(getValues).
+ *
+ * All public methods are thread-safe.
+ *
+ */
+public class ListGauge<T> implements Metric {
+  private final String name;
+  private final Queue<TimestampedValue<T>> elements;
+
+  private final int maxNumberOfItems;
+  private final Duration maxStaleness;
+  private final static int DEFAULT_MAX_NITEMS = 1000;
+  private final static Duration DEFAULT_MAX_STALENESS = Duration.ofMinutes(60);
+
+  /**
+   * Create a new {@link ListGauge} that auto evicts based on the given 
maxNumberOfItems, maxStaleness, and period parameters.
+   *
+   * @param name Name to be assigned
+   * @param maxNumberOfItems The max number of items that can remain in the 
listgauge
+   * @param maxStaleness The max staleness of items permitted in the listgauge
+   */
+  public ListGauge(String name, int maxNumberOfItems, Duration maxStaleness) {
+    this.name = name;
+    this.elements = new ConcurrentLinkedQueue<TimestampedValue<T>>();
+    this.maxNumberOfItems = maxNumberOfItems;
+    this.maxStaleness = maxStaleness;
+  }
+
+  /**
+   * Create a new {@link ListGauge} that auto evicts upto a max of 100 items 
and a max-staleness of 60 minutes.
+   * @param name Name to be assigned
+   */
+  public ListGauge(String name) {
+    this(name, DEFAULT_MAX_NITEMS, DEFAULT_MAX_STALENESS);
+  }
+
+  /**
+   * Get the name assigned to this {@link ListGauge}
+   * @return the assigned name
+   */
+  public String getName() {
+    return this.name;
+  }
+
+  /**
+   * Get the Collection of values currently in the list.
+   * @return the collection of values
+   */
+  public Collection<T> getValues() {
+    this.evict();
+    return Collections.unmodifiableList(this.elements.stream().map(x -> 
x.getValue()).collect(Collectors.toList()));
+  }
+
+  /**
+   * Add a value to the list.
+   * (Timestamp assigned to this value is the current timestamp.)
+   * @param value The Gauge value to be added
+   */
+  public void add(T value) {
+    this.elements.add(new TimestampedValue<T>(value, 
Instant.now().toEpochMilli()));
+
+    // perform any evictions that may be needed.
+    this.evict();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void visit(MetricsVisitor visitor) {
+    visitor.listGauge(this);
+  }
+
+  /**
+   * Evicts entries from the elements list, based on the given item-size and 
durationThreshold.
+   */
+  private void evict() {
+    this.evictBasedOnSize();
+    this.evictBasedOnTimestamp();
+  }
+
+  /**
+   * Evicts entries from elements in FIFO order until it has maxNumberOfItems
+   */
+  private void evictBasedOnSize() {
+    int numToEvict = this.elements.size() - this.maxNumberOfItems;
+    while (numToEvict > 0) {
+      this.elements.poll(); // remove head
+      numToEvict--;
+    }
+  }
+
+  /**
+   * Removes entries from elements to ensure no element has a timestamp more 
than maxStaleness before current timestamp.
+   */
+  private void evictBasedOnTimestamp() {
+    Instant currentTimestamp = Instant.now();
+    TimestampedValue<T> valueInfo = this.elements.peek();
+
+    // continue remove-head if currenttimestamp - head-element's timestamp > 
durationThreshold
+    while (valueInfo != null
+        && currentTimestamp.toEpochMilli() - valueInfo.getTimestamp() > 
this.maxStaleness.toMillis()) {
+      this.elements.poll();
+      valueInfo = this.elements.peek();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java 
b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
index 5a00d01..fa0fd39 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
@@ -65,6 +65,15 @@ public interface MetricsRegistry {
   <T> Gauge<T> newGauge(String group, Gauge<T> value);
 
   /**
+   * Register a {@link org.apache.samza.metrics.ListGauge}
+   * @param group Group for this ListGauge
+   * @param listGauge the ListGauge to register
+   * @param <T> Type of the ListGauge
+   * @return ListGauge registered
+   */
+  <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge);
+
+  /**
    * Create and Register a new {@link org.apache.samza.metrics.Timer}
    * @param group Group for this Timer
    * @param name Name of to-be-created Timer

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java 
b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
index 75abfe7..49a0929 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
@@ -31,8 +31,13 @@ public abstract class MetricsVisitor {
 
   public abstract void timer(Timer timer);
 
+  public abstract <T> void listGauge(ListGauge<T> listGauge);
+
   public void visit(Metric metric) {
-    if (metric instanceof Counter) {
+    // Cast for metrics of type ListGauge
+    if (metric instanceof ListGauge<?>) {
+      listGauge((ListGauge<?>) metric);
+    } else if (metric instanceof Counter) {
       counter((Counter) metric);
     } else if (metric instanceof Gauge<?>) {
       gauge((Gauge<?>) metric);

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
 
b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
index 739d68f..ba5b182 100644
--- 
a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
+++ 
b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
@@ -24,5 +24,7 @@ public interface ReadableMetricsRegistryListener {
 
   void onGauge(String group, Gauge<?> gauge);
 
+  void onListGauge(String group, ListGauge<?> listGauge);
+
   void onTimer(String group, Timer timer);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java 
b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
index 3df855c..76b8216 100644
--- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
@@ -21,9 +21,11 @@ package org.apache.samza.util;
 
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.ListGauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 
+
 /**
  * {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no 
actual metrics need to be
  * recorded but a registry is still required.
@@ -50,6 +52,11 @@ public class NoOpMetricsRegistry implements MetricsRegistry {
   }
 
   @Override
+  public <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge) {
+    return listGauge;
+  }
+
+  @Override
   public Timer newTimer(String group, String name) {
     return new Timer(name);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java 
b/samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java
new file mode 100644
index 0000000..e767918
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.util;
+
+/**
+ * An immutable pair of a value, and its corresponding timestamp.
+ *
+ * @param <V> the type of the value
+ */
+public class TimestampedValue<V> {
+  private final V value;
+  private final long timestamp;
+
+  public TimestampedValue(V v, long timestamp) {
+    this.value = v;
+    this.timestamp = timestamp;
+  }
+
+  public V getValue() {
+    return value;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || !getClass().equals(o.getClass())) return false;
+
+    TimestampedValue<?> that = (TimestampedValue<?>) o;
+
+    if (timestamp != that.timestamp) return false;
+    return value != null ? value.equals(that.value) : (that.value == null);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = value != null ? value.hashCode() : 0;
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java 
b/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java
new file mode 100644
index 0000000..eb91012
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Iterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Class to encapsulate test-cases for {@link 
org.apache.samza.metrics.ListGauge}
+ */
+public class TestListGauge {
+
+  private final static Duration THREAD_TEST_TIMEOUT = Duration.ofSeconds(10);
+
+  private <T> ListGauge<T> getListGaugeForTest() {
+    return new ListGauge<T>("sampleListGauge", 10, Duration.ofSeconds(60));
+  }
+
+  @Test
+  public void basicTest() {
+    ListGauge<String> listGauge = getListGaugeForTest();
+    listGauge.add("sampleValue");
+    Assert.assertEquals("Names should be the same", listGauge.getName(), 
"sampleListGauge");
+    Assert.assertEquals("List sizes should match", 
listGauge.getValues().size(), 1);
+    Assert.assertEquals("ListGauge should contain sampleGauge", 
listGauge.getValues().contains("sampleValue"), true);
+  }
+
+  @Test
+  public void testSizeEnforcement() {
+    ListGauge listGauge = getListGaugeForTest();
+    for (int i = 15; i > 0; i--) {
+      listGauge.add("v" + i);
+    }
+    Assert.assertEquals("List sizes should be as configured at creation time", 
listGauge.getValues().size(), 10);
+
+    int valueIndex = 10;
+    Collection<String> currentList = listGauge.getValues();
+    Iterator iterator = currentList.iterator();
+    while (iterator.hasNext()) {
+      String gaugeValue = (String) iterator.next();
+      Assert.assertTrue(gaugeValue.equals("v" + valueIndex));
+      valueIndex--;
+    }
+  }
+
+  @Test
+  public void testThreadSafety() throws InterruptedException {
+    ListGauge<Integer> listGauge = getListGaugeForTest();
+
+    Thread thread1 = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        for (int i = 1; i <= 100; i++) {
+          listGauge.add(i);
+        }
+      }
+    });
+
+    Thread thread2 = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        for (int i = 1; i <= 100; i++) {
+          listGauge.add(i);
+        }
+      }
+    });
+
+    thread1.start();
+    thread2.start();
+
+    thread1.join(THREAD_TEST_TIMEOUT.toMillis());
+    thread2.join(THREAD_TEST_TIMEOUT.toMillis());
+
+    Assert.assertTrue("ListGauge should have the last 10 values", 
listGauge.getValues().size() == 10);
+    for (Integer gaugeValue : listGauge.getValues()) {
+      Assert.assertTrue("Values should have the last 10 range", gaugeValue <= 
100 && gaugeValue > 90);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java 
b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
index 8076e02..c694d3f 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
@@ -19,13 +19,12 @@
 
 package org.apache.samza.metrics;
 
-import static org.junit.Assert.*;
-
 import java.util.Arrays;
-
 import org.apache.samza.util.Clock;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class TestTimer {
 
   /*

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
index d29b975..01b69ed 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
@@ -19,20 +19,21 @@
 
 package org.apache.samza.system.eventhub;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.collections4.map.HashedMap;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.ListGauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 public class TestMetricsRegistry implements MetricsRegistry {
 
   private Map<String, List<Counter>> counters = new HashedMap<>();
   private Map<String, List<Gauge<?>>> gauges = new HashedMap<>();
+  private Map<String, List<ListGauge>> listGauges = new HashedMap<>();
 
   public List<Counter> getCounters(String groupName) {
     return counters.get(groupName);
@@ -69,6 +70,13 @@ public class TestMetricsRegistry implements MetricsRegistry {
   }
 
   @Override
+  public ListGauge newListGauge(String group, ListGauge listGauge) {
+    listGauges.putIfAbsent(group, new ArrayList());
+    listGauges.get(group).add(listGauge);
+    return listGauge;
+  }
+
+  @Override
   public <T> Gauge<T> newGauge(String group, Gauge<T> value) {
     if (!gauges.containsKey(group)) {
       gauges.put(group, new ArrayList<>());

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
index 53526d8..fc57846 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
@@ -44,6 +44,10 @@ public class MetricGroup {
     return registry.newCounter(groupName, (prefix + name).toLowerCase());
   }
 
+  public <T> ListGauge<T> newListGauge(String name) {
+    return registry.newListGauge(groupName, new ListGauge(name));
+  }
+
   public <T> Gauge<T> newGauge(String name, T value) {
     return registry.newGauge(groupName, new Gauge<T>((prefix + 
name).toLowerCase(), value));
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
 
b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
index 5ede5e8..038abba 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.operators.functions;
 
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.storage.kv.KeyValueStore;
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 0f51798..df73e48 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -28,7 +28,7 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.BroadcastOperatorSpec;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index 90a71a0..0cdde49 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -21,7 +21,7 @@ package org.apache.samza.operators.impl;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.storage.kv.KeyValueStore;

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index 6b5baae..82dc0bf 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -28,7 +28,7 @@ import org.apache.samza.operators.functions.SupplierFunction;
 import org.apache.samza.operators.impl.store.TimeSeriesKey;
 import org.apache.samza.operators.impl.store.TimeSeriesStore;
 import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.triggers.FiringType;

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
index f3d6948..c9b694d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
@@ -21,6 +21,8 @@
 package org.apache.samza.operators.impl.store;
 
 import org.apache.samza.storage.kv.ClosableIterator;
+import org.apache.samza.util.TimestampedValue;
+
 
 /**
  * A key-value store that allows entries to be queried and stored based on 
time ranges.

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
index f03d396..10a5967 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
@@ -23,6 +23,7 @@ import org.apache.samza.storage.kv.ClosableIterator;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.util.TimestampedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
deleted file mode 100644
index 5e45148..0000000
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.samza.operators.impl.store;
-
-/**
- * An immutable pair of a value, and its corresponding timestamp.
- *
- * @param <V> the type of the value
- */
-public class TimestampedValue<V> {
-  private final V value;
-  private final long timestamp;
-
-  public TimestampedValue(V v, long timestamp) {
-    this.value = v;
-    this.timestamp = timestamp;
-  }
-
-  public V getValue() {
-    return value;
-  }
-
-  public long getTimestamp() {
-    return timestamp;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || !getClass().equals(o.getClass())) return false;
-
-    TimestampedValue<?> that = (TimestampedValue<?>) o;
-
-    if (timestamp != that.timestamp) return false;
-    return value != null ? value.equals(that.value) : (that.value == null);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = value != null ? value.hashCode() : 0;
-    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
index b14f8a4..5b0cdac 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators.impl.store;
 import org.apache.samza.serializers.Serde;
 
 import java.nio.ByteBuffer;
+import org.apache.samza.util.TimestampedValue;
 
 
 public class TimestampedValueSerde<V> implements Serde<TimestampedValue<V>> {

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index a218135..1b55784 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -23,7 +23,7 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.serializers.Serde;
 
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 38d0d9c..be0fb26 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -30,6 +30,7 @@ import java.util.concurrent.{ExecutorService, Executors, 
ScheduledExecutorServic
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.samza.checkpoint.{CheckpointListener, 
CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index c122956..a26e666 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -48,6 +48,8 @@ class SamzaContainerMetrics(
 
   val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new 
util.HashMap[TaskName, Gauge[Long]]()
 
+  val exceptions = newListGauge[String]("exceptions")
+
   def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
     taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" 
format(taskName.toString, storeName), -1L))
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
index 1520b0e..21ec763 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -40,9 +40,11 @@ trait MetricsHelper {
 
   def newGauge[T](name: String, value: T) = metricGroup.newGauge[T](name,value)
 
+  def newListGauge[T](name: String) = metricGroup.newListGauge[T](name)
+
   /**
-   * Specify a dynamic gauge that always returns the latest value when polled. 
-   * The value closure must be thread safe, since metrics reporters may access 
+   * Specify a dynamic gauge that always returns the latest value when polled.
+   * The value closure must be thread safe, since metrics reporters may access
    * it from another thread.
    */
   def newGauge[T](name: String, value: () => T) = {

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala 
b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
index 40ffee2..75ed6aa 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -75,6 +75,21 @@ class MetricsRegistryMap(val name: String) extends 
ReadableMetricsRegistry with
     newTimer(group, new Timer(name))
   }
 
+  /**
+    * Register a {@link org.apache.samza.metrics.ListGauge}
+    *
+    * @param group     Group for this ListGauge
+    * @param listGauge the ListGauge to register
+    * @tparam T the type of the list gauge
+    */
+  def newListGauge[T](group: String, listGauge: ListGauge[T]) = {
+    debug("Adding new listgauge %s %s %s." format(group, listGauge.getName, 
listGauge))
+    putAndGetGroup(group).putIfAbsent(listGauge.getName, listGauge)
+    val realListGauge = 
metrics.get(group).get(listGauge.getName).asInstanceOf[ListGauge[T]]
+    listeners.foreach(_.onListGauge(group, realListGauge))
+    realListGauge
+  }
+
   private def putAndGetGroup(group: String) = {
     metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric])
     metrics.get(group)

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
index 7da8a9c..c601b29 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
@@ -20,19 +20,15 @@
 package org.apache.samza.metrics.reporter
 
 import java.lang.management.ManagementFactory
+
 import org.apache.samza.util.Logging
 import javax.management.MBeanServer
 import javax.management.ObjectName
+
 import org.apache.samza.config.Config
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.Gauge
-import org.apache.samza.metrics.Timer
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.ReadableMetricsRegistry
-import org.apache.samza.metrics.ReadableMetricsRegistryListener
+import org.apache.samza.metrics._
+
 import scala.collection.JavaConverters._
-import org.apache.samza.metrics.MetricsVisitor
 import org.apache.samza.metrics.JmxUtil._
 
 class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
@@ -52,6 +48,8 @@ class JmxReporter(server: MBeanServer) extends 
MetricsReporter with Logging {
               def counter(counter: Counter) = registerBean(new 
JmxCounter(counter, getObjectName(group, name, sources(registry))))
               def gauge[T](gauge: Gauge[T]) = registerBean(new 
JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, 
sources(registry))))
               def timer(timer: Timer) = registerBean(new JmxTimer(timer, 
getObjectName(group, name, sources(registry))))
+              def listGauge[T](listGauge: ListGauge[T]) = registerBean(new 
JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, 
name, sources(registry))))
+
             })
         }
       })
@@ -65,14 +63,15 @@ class JmxReporter(server: MBeanServer) extends 
MetricsReporter with Logging {
         def onCounter(group: String, counter: Counter) {
           registerBean(new JmxCounter(counter, getObjectName(group, 
counter.getName, source)))
         }
-
         def onGauge(group: String, gauge: Gauge[_]) {
           registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], 
getObjectName(group, gauge.getName, source)))
         }
-
         def onTimer(group: String, timer: Timer) {
           registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, 
source)))
         }
+        def onListGauge(group: String, listGauge: ListGauge[_]) {
+          registerBean(new 
JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, 
listGauge.getName, source)))
+        }
       }
     } else {
       warn("Trying to re-register a registry for source %s. Ignoring." format 
source)
@@ -110,6 +109,11 @@ class JmxGauge(g: org.apache.samza.metrics.Gauge[Object], 
on: ObjectName) extend
   def objectName = on
 }
 
+class JmxListGauge(g: org.apache.samza.metrics.ListGauge[Object], on: 
ObjectName) extends JmxGaugeMBean {
+  def getValue = g.getValues
+  def objectName = on
+}
+
 trait JmxCounterMBean extends MetricMBean {
   def getCount(): Long
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index 65ca49c..d300e90 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -26,7 +26,6 @@ import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.SystemStream
 import org.apache.samza.util.Logging
-
 import java.util.HashMap
 import java.util.Map
 import java.util.concurrent.Executors
@@ -83,15 +82,18 @@ class MetricsSnapshotReporter(
   }
 
   def stop = {
-    info("Stopping producer.")
 
-    producer.stop
+    // Scheduling an event with 0 delay to ensure flushing of metrics one last 
time before shutdown
+    executor.schedule(this,0, TimeUnit.SECONDS)
 
     info("Stopping reporter timer.")
-
+    // Allow the scheduled task above to finish, and block for termination 
(for max 60 seconds)
     executor.shutdown
     executor.awaitTermination(60, TimeUnit.SECONDS)
 
+    info("Stopping producer.")
+    producer.stop
+
     if (!executor.isTerminated) {
       warn("Unable to shutdown reporter timer.")
     }
@@ -112,6 +114,8 @@ class MetricsSnapshotReporter(
         registry.getGroup(group).asScala.foreach {
           case (name, metric) =>
             metric.visit(new MetricsVisitor {
+              // for listGauge the value is returned as a list, which gets 
serialized
+              def listGauge[T](listGauge: ListGauge[T]) = {groupMsg.put(name, 
listGauge.getValues)  }
               def counter(counter: Counter) = groupMsg.put(name, 
counter.getCount: java.lang.Long)
               def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, 
gauge.getValue.asInstanceOf[Object])
               def timer(timer: Timer) = groupMsg.put(name, 
timer.getSnapshot().getAverage(): java.lang.Double)
@@ -133,12 +137,18 @@ class MetricsSnapshotReporter(
         metricsSnapshot
       }
 
-      producer.send(source, new OutgoingMessageEnvelope(out, host, null, 
maybeSerialized))
+      try {
+
+        producer.send(source, new OutgoingMessageEnvelope(out, host, null, 
maybeSerialized))
 
-      // Always flush, since we don't want metrics to get batched up.
-      producer.flush(source)
+        // Always flush, since we don't want metrics to get batched up.
+        producer.flush(source)
+      } catch  {
+        case e: Exception => error("Exception when flushing metrics for source 
%s " format(source), e)
+      }
     }
 
+
     debug("Finished flushing metrics.")
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index b87e5ed..6fdcacc 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -52,7 +52,7 @@ import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.InitableFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
index 0315a20..94e171a 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
@@ -24,6 +24,7 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.ClosableIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.util.TimestampedValue;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
index 40015ec..1621e73 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
@@ -20,6 +20,7 @@ package org.apache.samza.operators.impl.store;
 
 import org.apache.samza.serializers.ByteSerde;
 import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.util.TimestampedValue;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/samza/blob/f6a2d54d/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index 5c10987..c93591a 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -46,6 +46,9 @@ object ApplicationMasterRestServlet {
       metricsRegistry.getGroup(group).asScala.foreach {
         case (name, metric) =>
           metric.visit(new MetricsVisitor() {
+            def listGauge[T](listGauge: ListGauge[T]) =
+              groupMap.put(name, listGauge.getValues)
+
             def counter(counter: Counter) =
               groupMap.put(counter.getName, counter.getCount: lang.Long)
 

Reply via email to