http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 3dd98e0..9e71300 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -38,10 +38,10 @@ import static org.mockito.Mockito.withSettings;
 
 import com.google.common.collect.Iterables;
 import java.util.List;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -145,7 +145,8 @@ public class ReduceFnRunnerTest {
   @Test
   public void testOnElementBufferingDiscarding() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and 
discarding mode.
-    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
         ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), 
mockTriggerStateMachine,
             AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
@@ -170,8 +171,7 @@ public class ReduceFnRunnerTest {
     // This element shouldn't be seen, because the trigger has finished
     injectElement(tester, 4);
 
-    long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
-        MetricName.named(ReduceFnRunner.class,
+    long droppedElements = 
container.getCounter(MetricName.named(ReduceFnRunner.class,
             ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
         .getCumulative().longValue();
     assertEquals(1, droppedElements);
@@ -423,7 +423,8 @@ public class ReduceFnRunnerTest {
 
   @Test
   public void testWatermarkHoldAndLateData() throws Exception {
-    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
     // Test handling of late data. Specifically, ensure the watermark hold is 
correct.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
         ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), 
mockTriggerStateMachine,
@@ -452,7 +453,7 @@ public class ReduceFnRunnerTest {
     // Holding for the end-of-window transition.
     assertEquals(new Instant(9), tester.getWatermarkHold());
     // Nothing dropped.
-    long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+    long droppedElements = container.getCounter(
         MetricName.named(ReduceFnRunner.class,
             ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
         .getCumulative().longValue();
@@ -514,7 +515,7 @@ public class ReduceFnRunnerTest {
     // Because we're about to expire the window, we output it.
     
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
     injectElement(tester, 8);
-    droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+    droppedElements = container.getCounter(
         MetricName.named(ReduceFnRunner.class,
             ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
         .getCumulative().longValue();
@@ -1083,7 +1084,8 @@ public class ReduceFnRunnerTest {
    */
   @Test
   public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
-    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
     ReduceFnTester<Integer, Integer, IntervalWindow> tester = 
ReduceFnTester.combining(
         WindowingStrategy.of(
             SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
@@ -1097,7 +1099,7 @@ public class ReduceFnRunnerTest {
         // assigned to [-30, 70), [0, 100), [30, 130)
         TimestampedValue.of(12, new Instant(40)));
 
-    long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+    long droppedElements = container.getCounter(
         MetricName.named(ReduceFnRunner.class,
             ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
         .getCumulative().longValue();
@@ -1109,7 +1111,7 @@ public class ReduceFnRunnerTest {
         // but [-30, 70) is closed by the trigger
         TimestampedValue.of(14, new Instant(60)));
 
-    droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+    droppedElements = container.getCounter(
         MetricName.named(ReduceFnRunner.class,
             ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
         .getCumulative().longValue();
@@ -1120,7 +1122,7 @@ public class ReduceFnRunnerTest {
     // but they are all closed
     tester.injectElements(TimestampedValue.of(16, new Instant(40)));
 
-    droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+    droppedElements = container.getCounter(
         MetricName.named(ReduceFnRunner.class,
             ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
         .getCumulative().longValue();
@@ -1129,7 +1131,8 @@ public class ReduceFnRunnerTest {
 
   @Test
   public void testIdempotentEmptyPanesDiscarding() throws Exception {
-    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
     // Test uninteresting (empty) panes don't increment the index or otherwise
     // modify PaneInfo.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
@@ -1170,7 +1173,7 @@ public class ReduceFnRunnerTest {
     assertTrue(tester.isMarkedFinished(firstWindow));
     tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
 
-    long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+    long droppedElements = container.getCounter(
         MetricName.named(ReduceFnRunner.class,
             ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
         .getCumulative().longValue();
@@ -1179,7 +1182,8 @@ public class ReduceFnRunnerTest {
 
   @Test
   public void testIdempotentEmptyPanesAccumulating() throws Exception {
-    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
     // Test uninteresting (empty) panes don't increment the index or otherwise
     // modify PaneInfo.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
@@ -1222,7 +1226,7 @@ public class ReduceFnRunnerTest {
     assertTrue(tester.isMarkedFinished(firstWindow));
     tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
 
-    long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+    long droppedElements = container.getCounter(
         MetricName.named(ReduceFnRunner.class,
             ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
         .getCumulative().longValue();

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 76351e4..5172f43 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -25,10 +25,10 @@ import static org.mockito.Mockito.when;
 import com.google.common.base.MoreObjects;
 import java.util.Collections;
 import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -93,7 +93,8 @@ public class StatefulDoFnRunnerTest {
 
   @Test
   public void testLateDropping() throws Exception {
-    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
 
     timerInternals.advanceInputWatermark(new 
Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
     timerInternals.advanceOutputWatermark(new 
Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -117,9 +118,8 @@ public class StatefulDoFnRunnerTest {
         WindowedValue.of(KV.of("hello", 1), timestamp, window, 
PaneInfo.NO_FIRING));
 
 
-    long droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
-        MetricName.named(StatefulDoFnRunner.class,
-            
StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue();
+    long droppedValues = 
container.getCounter(MetricName.named(StatefulDoFnRunner.class,
+        
StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue();
     assertEquals(1L, droppedValues);
 
     runner.finishBundle();

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/CounterCellTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/CounterCellTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/CounterCellTest.java
new file mode 100644
index 0000000..12906d9
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/CounterCellTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.metrics.MetricName;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link CounterCell}.
+ */
+@RunWith(JUnit4.class)
+public class CounterCellTest {
+
+  private CounterCell cell = new CounterCell(MetricName.named("hello", 
"world"));
+
+  @Test
+  public void testDeltaAndCumulative() {
+    cell.inc(5);
+    cell.inc(7);
+    assertThat(cell.getCumulative(), equalTo(12L));
+    assertThat("getCumulative is idempotent", cell.getCumulative(), 
equalTo(12L));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+    assertThat(cell.getCumulative(), equalTo(12L));
+
+    cell.inc(30);
+    assertThat(cell.getCumulative(), equalTo(42L));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DirtyStateTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DirtyStateTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DirtyStateTest.java
new file mode 100644
index 0000000..e5c8898
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DirtyStateTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DirtyStateTest}.
+ */
+@RunWith(JUnit4.class)
+public class DirtyStateTest {
+
+  private final DirtyState dirty = new DirtyState();
+
+  @Test
+  public void basicPath() {
+    assertThat("Should start dirty", dirty.beforeCommit(), is(true));
+    dirty.afterCommit();
+    assertThat("Should be clean after commit", dirty.beforeCommit(), 
is(false));
+
+    dirty.afterModification();
+    assertThat("Should be dirty after change", dirty.beforeCommit(), is(true));
+    dirty.afterCommit();
+    assertThat("Should be clean after commit", dirty.beforeCommit(), 
is(false));
+  }
+
+  @Test
+  public void changeAfterBeforeCommit() {
+    assertThat("Should start dirty", dirty.beforeCommit(), is(true));
+    dirty.afterModification();
+    dirty.afterCommit();
+    assertThat("Changes after beforeCommit should be dirty after afterCommit",
+        dirty.beforeCommit(), is(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java
new file mode 100644
index 0000000..c0b045a
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.metrics.MetricName;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DistributionCell}.
+ */
+@RunWith(JUnit4.class)
+public class DistributionCellTest {
+  private DistributionCell cell = new 
DistributionCell(MetricName.named("hello", "world"));
+
+  @Test
+  public void testDeltaAndCumulative() {
+    cell.update(5);
+    cell.update(7);
+    assertThat(cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 
7)));
+    assertThat("getCumulative is idempotent",
+        cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7)));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+
+    cell.update(30);
+    assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 5, 
30)));
+
+    assertThat("Adding a new value made the cell dirty",
+        cell.getDirty().beforeCommit(), equalTo(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GaugeCellTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GaugeCellTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GaugeCellTest.java
new file mode 100644
index 0000000..cfaf92d
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GaugeCellTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.metrics.MetricName;
+import org.junit.Test;
+
+/**
+ * Tests for {@link GaugeCell}.
+ */
+public class GaugeCellTest {
+  private GaugeCell cell = new GaugeCell(MetricName.named("hello", "world"));
+
+  @Test
+  public void testDeltaAndCumulative() {
+    cell.set(5);
+    cell.set(7);
+    assertThat(cell.getCumulative().value(), 
equalTo(GaugeData.create(7).value()));
+    assertThat("getCumulative is idempotent",
+        cell.getCumulative().value(), equalTo(7L));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+
+    cell.set(30);
+    assertThat(cell.getCumulative().value(), equalTo(30L));
+
+    assertThat("Adding a new value made the cell dirty",
+        cell.getDirty().beforeCommit(), equalTo(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricUpdateMatchers.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricUpdateMatchers.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricUpdateMatchers.java
new file mode 100644
index 0000000..a5ee874
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricUpdateMatchers.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.util.Objects;
+import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matchers for {@link MetricUpdate}.
+ */
+public class MetricUpdateMatchers {
+
+  /**
+   * Matches a {@link MetricUpdate} with the given name and contents.
+   *
+   * <p>Visible since it may be used in runner-specific tests.
+   */
+  public static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, 
final T update) {
+    return new TypeSafeMatcher<MetricUpdate<T>>() {
+      @Override
+      protected boolean matchesSafely(MetricUpdate<T> item) {
+        return Objects.equals(name, item.getKey().metricName().name())
+            && Objects.equals(update, item.getUpdate());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricUpdate{name=").appendValue(name)
+            .appendText(", update=").appendValue(update)
+            .appendText("}");
+      }
+    };
+  }
+
+  /**
+   * Matches a {@link MetricUpdate} with the given namespace, name, step and 
contents.
+   *
+   * <p>Visible since it may be used in runner-specific tests.
+   */
+  public static <T> Matcher<MetricUpdate<T>> metricUpdate(
+      final String namespace, final String name, final String step, final T 
update) {
+    return new TypeSafeMatcher<MetricUpdate<T>>() {
+      @Override
+      protected boolean matchesSafely(MetricUpdate<T> item) {
+        return Objects.equals(namespace, 
item.getKey().metricName().namespace())
+            && Objects.equals(name, item.getKey().metricName().name())
+            && Objects.equals(step, item.getKey().stepName())
+            && Objects.equals(update, item.getUpdate());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricUpdate{inNamespace=").appendValue(namespace)
+            .appendText(", name=").appendValue(name)
+            .appendText(", step=").appendValue(step)
+            .appendText(", update=").appendValue(update)
+            .appendText("}");
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
new file mode 100644
index 0000000..b304d3b
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static 
org.apache.beam.runners.core.metrics.MetricUpdateMatchers.metricUpdate;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.emptyIterable;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.metrics.MetricName;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricsContainerImpl}.
+ */
+@RunWith(JUnit4.class)
+public class MetricsContainerImplTest {
+
+  @Test
+  public void testCounterDeltas() {
+    MetricsContainerImpl container = new MetricsContainerImpl("step1");
+    CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
+    CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
+    assertThat("All counters should start out dirty",
+        container.getUpdates().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 0L),
+        metricUpdate("name2", 0L)));
+    container.commitUpdates();
+    assertThat("After commit no counters should be dirty",
+        container.getUpdates().counterUpdates(), emptyIterable());
+
+    c1.inc(5L);
+    c2.inc(4L);
+
+    assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 5L),
+        metricUpdate("name2", 4L)));
+
+    assertThat("Since we haven't committed, updates are still included",
+        container.getUpdates().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 5L),
+        metricUpdate("name2", 4L)));
+
+    container.commitUpdates();
+    assertThat("After commit there are no updates",
+        container.getUpdates().counterUpdates(), emptyIterable());
+
+    c1.inc(8L);
+    assertThat(container.getUpdates().counterUpdates(), contains(
+        metricUpdate("name1", 13L)));
+  }
+
+  @Test
+  public void testCounterCumulatives() {
+    MetricsContainerImpl container = new MetricsContainerImpl("step1");
+    CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
+    CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
+    c1.inc(2L);
+    c2.inc(4L);
+    c1.inc(3L);
+
+    container.getUpdates();
+    container.commitUpdates();
+    assertThat("Committing updates shouldn't affect cumulative counter values",
+        container.getCumulative().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 5L),
+        metricUpdate("name2", 4L)));
+
+    c1.inc(8L);
+    assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 13L),
+        metricUpdate("name2", 4L)));
+  }
+
+  @Test
+  public void testDistributionDeltas() {
+    MetricsContainerImpl container = new MetricsContainerImpl("step1");
+    DistributionCell c1 = container.getDistribution(MetricName.named("ns", 
"name1"));
+    DistributionCell c2 = container.getDistribution(MetricName.named("ns", 
"name2"));
+
+    assertThat("Initial update includes initial zero-values",
+        container.getUpdates().distributionUpdates(), containsInAnyOrder(
+        metricUpdate("name1", DistributionData.EMPTY),
+        metricUpdate("name2", DistributionData.EMPTY)));
+
+    container.commitUpdates();
+    assertThat("No updates after commit",
+        container.getUpdates().distributionUpdates(), emptyIterable());
+
+    c1.update(5L);
+    c2.update(4L);
+
+    assertThat(container.getUpdates().distributionUpdates(), 
containsInAnyOrder(
+        metricUpdate("name1", DistributionData.create(5, 1, 5, 5)),
+        metricUpdate("name2", DistributionData.create(4, 1, 4, 4))));
+    assertThat("Updates stay the same without commit",
+        container.getUpdates().distributionUpdates(), containsInAnyOrder(
+        metricUpdate("name1", DistributionData.create(5, 1, 5, 5)),
+        metricUpdate("name2", DistributionData.create(4, 1, 4, 4))));
+
+    container.commitUpdates();
+    assertThat("No updatess after commit",
+        container.getUpdates().distributionUpdates(), emptyIterable());
+
+    c1.update(8L);
+    c1.update(4L);
+    assertThat(container.getUpdates().distributionUpdates(), contains(
+        metricUpdate("name1", DistributionData.create(17, 3, 4, 8))));
+    container.commitUpdates();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
new file mode 100644
index 0000000..3074cc0
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asMetricResults;
+import static org.apache.beam.sdk.metrics.MetricResultsMatchers.metricsResult;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.hamcrest.collection.IsIterableWithSize;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Tests for {@link MetricsContainerStepMap}.
+ */
+public class MetricsContainerStepMapTest {
+
+  private static final String NAMESPACE = 
MetricsContainerStepMapTest.class.getName();
+  private static final String STEP1 = "myStep1";
+  private static final String STEP2 = "myStep2";
+  private static final String COUNTER_NAME = "myCounter";
+  private static final String DISTRIBUTION_NAME = "myDistribution";
+  private static final String GAUGE_NAME = "myGauge";
+
+  private static final long VALUE = 100;
+
+  private static final Counter counter =
+      Metrics.counter(MetricsContainerStepMapTest.class, COUNTER_NAME);
+  private static final Distribution distribution =
+      Metrics.distribution(MetricsContainerStepMapTest.class, 
DISTRIBUTION_NAME);
+  private static final Gauge gauge =
+      Metrics.gauge(MetricsContainerStepMapTest.class, GAUGE_NAME);
+
+  private static final MetricsContainerImpl metricsContainer;
+
+  static {
+    metricsContainer = new MetricsContainerImpl(null);
+    try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
+      counter.inc(VALUE);
+      distribution.update(VALUE);
+      distribution.update(VALUE * 2);
+      gauge.set(VALUE);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    assertIterableSize(step1res.counters(), 1);
+    assertIterableSize(step1res.distributions(), 1);
+    assertIterableSize(step1res.gauges(), 1);
+
+    assertCounter(COUNTER_NAME, step1res, STEP1, VALUE, false);
+    assertDistribution(DISTRIBUTION_NAME,
+        step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE 
* 2),
+        false);
+    assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, 
Instant.now()), false);
+
+    MetricQueryResults step2res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
+
+    assertIterableSize(step2res.counters(), 1);
+    assertIterableSize(step2res.distributions(), 1);
+    assertIterableSize(step2res.gauges(), 1);
+
+    assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 2, false);
+    assertDistribution(
+        DISTRIBUTION_NAME, step2res, STEP2,
+        DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), false);
+    assertGauge(
+        GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), 
false);
+
+    MetricQueryResults allres =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+
+    assertIterableSize(allres.counters(), 2);
+    assertIterableSize(allres.distributions(), 2);
+    assertIterableSize(allres.gauges(), 2);
+  }
+
+  @Test
+  public void 
testCounterCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("This runner does not currently support committed 
metrics results.");
+
+    assertCounter(COUNTER_NAME, step1res, STEP1, VALUE, true);
+  }
+
+  @Test
+  public void 
testDistributionCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("This runner does not currently support committed 
metrics results.");
+
+    assertDistribution(DISTRIBUTION_NAME, step1res, STEP1, 
DistributionResult.ZERO, true);
+  }
+
+  @Test
+  public void 
testGaugeCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("This runner does not currently support committed 
metrics results.");
+
+    assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.empty(), true);
+  }
+
+  @Test
+  public void testAttemptedAndCommittedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    attemptedMetrics.update(STEP1, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+
+    MetricsContainerStepMap committedMetrics = new MetricsContainerStepMap();
+    committedMetrics.update(STEP1, metricsContainer);
+    committedMetrics.update(STEP2, metricsContainer);
+    committedMetrics.update(STEP2, metricsContainer);
+
+    MetricResults metricResults =
+        asMetricResults(attemptedMetrics, committedMetrics);
+
+    MetricQueryResults step1res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    assertIterableSize(step1res.counters(), 1);
+    assertIterableSize(step1res.distributions(), 1);
+    assertIterableSize(step1res.gauges(), 1);
+
+    assertCounter(COUNTER_NAME, step1res, STEP1, VALUE * 2, false);
+    assertDistribution(
+        DISTRIBUTION_NAME, step1res, STEP1,
+        DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), false);
+    assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, 
Instant.now()), false);
+
+    assertCounter(COUNTER_NAME, step1res, STEP1, VALUE, true);
+    assertDistribution(DISTRIBUTION_NAME, step1res, STEP1,
+        DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), true);
+    assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, 
Instant.now()), true);
+
+    MetricQueryResults step2res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
+
+    assertIterableSize(step2res.counters(), 1);
+    assertIterableSize(step2res.distributions(), 1);
+    assertIterableSize(step2res.gauges(), 1);
+
+    assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 3, false);
+    assertDistribution(DISTRIBUTION_NAME, step2res, STEP2,
+        DistributionResult.create(VALUE * 9, 6, VALUE, VALUE * 2), false);
+    assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, 
Instant.now()), false);
+
+    assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 2, true);
+    assertDistribution(DISTRIBUTION_NAME, step2res, STEP2,
+        DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), true);
+    assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, 
Instant.now()), true);
+
+    MetricQueryResults allres =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+
+    assertIterableSize(allres.counters(), 2);
+    assertIterableSize(allres.distributions(), 2);
+    assertIterableSize(allres.gauges(), 2);
+  }
+
+  private <T> void assertIterableSize(Iterable<T> iterable, int size) {
+    assertThat(iterable, IsIterableWithSize.<T>iterableWithSize(size));
+  }
+
+  private void assertCounter(
+      String name,
+      MetricQueryResults metricQueryResults,
+      String step,
+      Long expected,
+      boolean isCommitted) {
+    assertThat(
+        metricQueryResults.counters(),
+        hasItem(metricsResult(NAMESPACE, name, step, expected, isCommitted)));
+  }
+
+  private void assertDistribution(
+      String name,
+      MetricQueryResults metricQueryResults,
+      String step,
+      DistributionResult expected,
+      boolean isCommitted) {
+    assertThat(
+        metricQueryResults.distributions(),
+        hasItem(metricsResult(NAMESPACE, name, step, expected, isCommitted)));
+  }
+
+  private void assertGauge(
+      String name,
+      MetricQueryResults metricQueryResults,
+      String step,
+      GaugeResult expected,
+      boolean isCommitted) {
+    assertThat(
+        metricQueryResults.gauges(),
+        hasItem(metricsResult(NAMESPACE, name, step, expected, isCommitted)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsMapTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsMapTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsMapTest.java
new file mode 100644
index 0000000..96210dd
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsMapTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+/**
+ * Tests for {@link MetricsMap}.
+ */
+@RunWith(JUnit4.class)
+public class MetricsMapTest {
+
+  public MetricsMap<String, AtomicLong> metricsMap =
+      new MetricsMap<>(new MetricsMap.Factory<String, AtomicLong>() {
+    @Override
+    public AtomicLong createInstance(String unusedKey) {
+      return new AtomicLong();
+    }
+  });
+
+  @Test
+  public void testCreateSeparateInstances() {
+    AtomicLong foo = metricsMap.get("foo");
+    AtomicLong bar = metricsMap.get("bar");
+
+    assertThat(foo, not(sameInstance(bar)));
+  }
+
+  @Test
+  public void testReuseInstances() {
+    AtomicLong foo1 = metricsMap.get("foo");
+    AtomicLong foo2 = metricsMap.get("foo");
+
+    assertThat(foo1, sameInstance(foo2));
+  }
+
+  @Test
+  public void testGet() {
+    assertThat(metricsMap.tryGet("foo"), nullValue(AtomicLong.class));
+
+    AtomicLong foo = metricsMap.get("foo");
+    assertThat(metricsMap.tryGet("foo"), sameInstance(foo));
+  }
+
+  @Test
+  public void testGetEntries() {
+    AtomicLong foo = metricsMap.get("foo");
+    AtomicLong bar = metricsMap.get("bar");
+    assertThat(metricsMap.entries(), containsInAnyOrder(
+        hasEntry("foo", foo),
+        hasEntry("bar", bar)));
+  }
+
+  private static Matcher<Map.Entry<String, AtomicLong>> hasEntry(
+      final String key, final AtomicLong value) {
+    return new TypeSafeMatcher<Entry<String, AtomicLong>>() {
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("Map.Entry{key=").appendValue(key)
+            .appendText(", value=").appendValue(value)
+            .appendText("}");
+      }
+
+      @Override
+      protected boolean matchesSafely(Entry<String, AtomicLong> item) {
+        return Objects.equals(key, item.getKey())
+            && Objects.equals(value, item.getValue());
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index b7cd6e7..8169332 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -32,20 +32,20 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.GaugeData;
+import org.apache.beam.runners.core.metrics.MetricFiltering;
+import org.apache.beam.runners.core.metrics.MetricKey;
+import org.apache.beam.runners.core.metrics.MetricUpdates;
+import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.runners.core.metrics.MetricsMap;
 import org.apache.beam.sdk.metrics.DistributionResult;
-import org.apache.beam.sdk.metrics.GaugeData;
 import org.apache.beam.sdk.metrics.GaugeResult;
-import org.apache.beam.sdk.metrics.MetricFiltering;
-import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.metrics.MetricUpdates;
-import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.metrics.MetricsFilter;
-import org.apache.beam.sdk.metrics.MetricsMap;
 
 /**
  * Implementation of {@link MetricResults} for the Direct Runner.

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index c7f7847..6efb99f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -23,9 +23,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Set;
+import org.apache.beam.runners.core.metrics.MetricUpdates;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 56f8650..76d817b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -21,8 +21,8 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
-import org.apache.beam.sdk.metrics.MetricUpdates;
-import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.runners.core.metrics.MetricUpdates;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -92,7 +92,7 @@ class TransformExecutor<T> implements Runnable {
 
   @Override
   public void run() {
-    MetricsContainer metricsContainer = new 
MetricsContainer(transform.getFullName());
+    MetricsContainerImpl metricsContainer = new 
MetricsContainerImpl(transform.getFullName());
     try (Closeable metricsScope = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
       Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
       for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
@@ -134,7 +134,7 @@ class TransformExecutor<T> implements Runnable {
    */
   private void processElements(
       TransformEvaluator<T> evaluator,
-      MetricsContainer metricsContainer,
+      MetricsContainerImpl metricsContainer,
       Collection<ModelEnforcement<T>> enforcements)
       throws Exception {
     if (inputBundle != null) {
@@ -167,7 +167,7 @@ class TransformExecutor<T> implements Runnable {
    *         {@link TransformEvaluator#finishBundle()}
    */
   private TransformResult<T> finishBundle(
-      TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
+      TransformEvaluator<T> evaluator, MetricsContainerImpl metricsContainer,
       Collection<ModelEnforcement<T>> enforcements)
       throws Exception {
     TransformResult<T> result =

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index 8d7aeda..554c2d6 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -19,9 +19,9 @@ package org.apache.beam.runners.direct;
 
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.MetricUpdates;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index d5d0aff..8cdf323 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -18,23 +18,23 @@
 
 package org.apache.beam.runners.direct;
 
-import static 
org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
-import static 
org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
 import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace;
+import static 
org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
+import static 
org.apache.beam.sdk.metrics.MetricResultsMatchers.committedMetricsResult;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.GaugeData;
+import org.apache.beam.runners.core.metrics.MetricKey;
+import org.apache.beam.runners.core.metrics.MetricUpdates;
+import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.metrics.DistributionResult;
-import org.apache.beam.sdk.metrics.GaugeData;
 import org.apache.beam.sdk.metrics.GaugeResult;
-import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricUpdates;
-import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.joda.time.Instant;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 038895a..b1f956d 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -17,15 +17,15 @@
  */
 package org.apache.beam.runners.flink;
 
-import static 
org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.joda.time.Duration;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
index 40191d2..ccce127 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink.metrics;
 import java.io.Closeable;
 import java.io.IOException;
 import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -29,7 +30,7 @@ import org.joda.time.Instant;
 
 /**
  * {@link DoFnRunner} decorator which registers
- * {@link org.apache.beam.sdk.metrics.MetricsContainer}. It updates metrics to 
Flink metrics and
+ * {@link MetricsContainerImpl}. It updates metrics to Flink metrics and
  * accumulators in {@link #finishBundle()}.
  */
 public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT> {

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index f81205e..677051e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -17,17 +17,18 @@
  */
 package org.apache.beam.runners.flink.metrics;
 
-import static 
org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -37,7 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Helper class for holding a {@link MetricsContainer} and forwarding Beam 
metrics to
+ * Helper class for holding a {@link MetricsContainerImpl} and forwarding Beam 
metrics to
  * Flink accumulators and metrics.
  */
 public class FlinkMetricContainer {

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java
index a9dc2ce..d8b5d25 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.flink.metrics;
 
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.SimpleAccumulator;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
index 64738cc..85d6426 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.metrics;
 
 import java.io.Closeable;
 import java.io.IOException;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
@@ -26,8 +27,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * Util for invoking {@link Source.Reader} methods that might require a
- * {@link org.apache.beam.sdk.metrics.MetricsContainer} to be active.
- * Source.Reader decorator which registers {@link 
org.apache.beam.sdk.metrics.MetricsContainer}.
+ * {@link MetricsContainerImpl} to be active.
+ * Source.Reader decorator which registers {@link MetricsContainerImpl}.
  * It update metrics to Flink metric and accumulator in start and advance.
  */
 public class ReaderInvocationUtil<OutputT, ReaderT extends 
Source.Reader<OutputT>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml 
b/runners/google-cloud-dataflow-java/pom.xml
index 3809335..f679976 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    
<dataflow.container_version>beam-master-20170508-pr2933</dataflow.container_version>
+    
<dataflow.container_version>beam-master-20170510</dataflow.container_version>
     
<dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
     
<dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
   </properties>

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index aa80959..a6a6a43 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -28,10 +28,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import org.apache.beam.runners.core.metrics.MetricFiltering;
+import org.apache.beam.runners.core.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.GaugeResult;
-import org.apache.beam.sdk.metrics.MetricFiltering;
-import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index bca3428..85a0979 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.dataflow;
 
-import static 
org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
-import static 
org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
+import static 
org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
+import static 
org.apache.beam.sdk.metrics.MetricResultsMatchers.committedMetricsResult;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index 3986e33..7fd6962 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -18,7 +18,7 @@
 
 package org.apache.beam.runners.spark;
 
-import static 
org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
 
 import java.io.IOException;
 import java.util.Objects;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index aa89c59..01cc176 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -28,13 +28,13 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.Accumulator;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index 2a9de4b..7106c73 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.metrics.Gauge;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
index 1dcfa2f..03b2187 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
@@ -21,10 +21,10 @@ package org.apache.beam.runners.spark.metrics;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import java.io.IOException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
 import 
org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.Accumulator;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
index dee4ebc..17cf71f 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
@@ -18,7 +18,7 @@
 
 package org.apache.beam.runners.spark.metrics;
 
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.spark.AccumulatorParam;
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
index e4bd598..5e67445 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
@@ -18,19 +18,19 @@
 
 package org.apache.beam.runners.spark.metrics;
 
-import static 
org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
 
 import com.codahale.metrics.Metric;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.MetricsFilter;
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 37d9635..1d843f9 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -31,6 +31,8 @@ import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
 import org.apache.beam.runners.core.construction.Triggers;
+import org.apache.beam.runners.core.metrics.CounterCell;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
@@ -43,9 +45,7 @@ import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.metrics.CounterCell;
 import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -207,7 +207,7 @@ public class SparkGroupAlsoByWindowViaWindowSet {
             new OutputWindowedValueHolder<>();
         // use in memory Aggregators since Spark Accumulators are not resilient
         // in stateful operators, once done with this partition.
-        final MetricsContainer cellProvider = new 
MetricsContainer("cellProvider");
+        final MetricsContainerImpl cellProvider = new 
MetricsContainerImpl("cellProvider");
         final CounterCell droppedDueToClosedWindow = cellProvider.getCounter(
             MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class,
             GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER));
@@ -321,12 +321,12 @@ public class SparkGroupAlsoByWindowViaWindowSet {
         long lateDropped = droppedDueToLateness.getCumulative();
         if (lateDropped > 0) {
           LOG.info(String.format("Dropped %d elements due to lateness.", 
lateDropped));
-          droppedDueToLateness.update(-droppedDueToLateness.getCumulative());
+          droppedDueToLateness.inc(-droppedDueToLateness.getCumulative());
         }
         long closedWindowDropped = droppedDueToClosedWindow.getCumulative();
         if (closedWindowDropped > 0) {
           LOG.info(String.format("Dropped %d elements due to closed window.", 
closedWindowDropped));
-          
droppedDueToClosedWindow.update(-droppedDueToClosedWindow.getCumulative());
+          
droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative());
         }
 
         return scala.collection.JavaConversions.asScalaIterator(outIter);

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 17a3c73..549bd30 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
index 8349b09..394b80b 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
@@ -21,8 +21,9 @@ package org.apache.beam.runners.spark.translation;
 import java.io.Closeable;
 import java.io.IOException;
 import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -32,7 +33,7 @@ import org.joda.time.Instant;
 
 
 /**
- * DoFnRunner decorator which registers {@link 
org.apache.beam.sdk.metrics.MetricsContainer}.
+ * DoFnRunner decorator which registers {@link MetricsContainerImpl}.
  */
 class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, 
OutputT> {
   private final DoFnRunner<InputT, OutputT> delegate;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index ecf96b6..4a66541 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -27,10 +27,10 @@ import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index acbac32..b2ed3a9 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -30,6 +30,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -41,7 +42,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index f736e53..acb4a02 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import javax.annotation.Nonnull;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -58,7 +59,6 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
index 86f35ba..5728fa0 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
@@ -24,8 +24,8 @@ import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -91,7 +91,7 @@ public class ReaderToIteratorAdapterTest {
   private final TestReader testReader = new TestReader();
 
   private final SourceRDD.Bounded.ReaderToIteratorAdapter<Integer> 
readerIterator =
-      new SourceRDD.Bounded.ReaderToIteratorAdapter<>(new 
MetricsContainer(""), testReader);
+      new SourceRDD.Bounded.ReaderToIteratorAdapter<>(new 
MetricsContainerImpl(""), testReader);
 
   private void assertReaderRange(final int start, final int end) {
     for (int i = start; i < end; i++) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 584edac..254a318 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-import static 
org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
+import static 
org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;

http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
index df6027c..cafd539 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.beam.runners.spark.translation.streaming;
 
-import static 
org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
+import static 
org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertThat;
 

Reply via email to