http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
deleted file mode 100644
index 4e0c15c..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.metrics;
-
-import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * Atomically tracks the dirty-state of a metric.
- *
- * <p>Reporting an update is split into two parts such that only changes made 
before the call to
- * {@link #beforeCommit()} are committed when {@link #afterCommit()} is 
invoked. This allows for
- * a two-step commit process of gathering all the dirty updates (calling 
{#link beforeCommit()})
- * followed by committing and calling {#link afterCommit()}.
- *
- * <p>The tracking of dirty states is done conservatively -- sometimes {@link 
#beforeCommit()}
- * will return true (indicating a dirty metric) even if there have been no 
changes since the last
- * commit.
- *
- * <p>There is also a possible race when the underlying metric is modified but 
the call to
- * {@link #afterModification()} hasn't happened before the call to {@link 
#beforeCommit()}. In this
- * case the next round of metric updating will see the changes. If this was 
for the final commit,
- * then the metric updates shouldn't be extracted until all possible user 
modifications have
- * completed.
- */
-@Experimental(Kind.METRICS)
-class DirtyState implements Serializable {
-  private enum State {
-    /** Indicates that there have been changes to the MetricCell since last 
commit. */
-    DIRTY,
-    /** Indicates that there have been no changes to the MetricCell since last 
commit. */
-    CLEAN,
-    /** Indicates that a commit of the current value is in progress. */
-    COMMITTING
-  }
-
-  private final AtomicReference<State> dirty = new 
AtomicReference<>(State.DIRTY);
-
-  /**
-   * Indicate that changes have been made to the metric being tracked by this 
{@link DirtyState}.
-   *
-   * <p>Should be called <b>after</b> modification of the value.
-   */
-  public void afterModification() {
-    dirty.set(State.DIRTY);
-  }
-
-  /**
-   * Check the dirty state and mark the metric as committing.
-   *
-   * <p>If the state was {@code CLEAN}, this returns {@code false}. If the 
state was {@code DIRTY}
-   * or {@code COMMITTING} this returns {@code true} and sets the state to 
{@code COMMITTING}.
-   *
-   * @return {@code false} if the state is clean and {@code true} otherwise.
-   */
-  public boolean beforeCommit() {
-    // After this loop, we want the state to be either CLEAN or COMMITTING.
-    // If the state was CLEAN, we don't need to do anything (and exit the loop 
early)
-    // If the state was DIRTY, we will attempt to do a CAS(DIRTY, COMMITTING). 
This will only
-    // fail if another thread is getting updates which generally shouldn't be 
the case.
-    // If the state was COMMITTING, we will attempt to do a CAS(COMMITTING, 
COMMITTING). This will
-    // fail if another thread commits updates (which shouldn't be the case) or 
if the user code
-    // updates the metric, in which case it will transition to DIRTY and the 
next iteration will
-    // successfully update it.
-    State state;
-    do {
-      state = dirty.get();
-    } while (state != State.CLEAN && !dirty.compareAndSet(state, 
State.COMMITTING));
-
-    return state != State.CLEAN;
-  }
-
-  /**
-   * Mark any changes up to the most recently call to {@link #beforeCommit()}} 
as committed.
-   * The next call to {@link #beforeCommit()} will return {@code false} unless 
there have
-   * been changes made since the previous call to {@link #beforeCommit()}.
-   */
-  public void afterCommit() {
-    dirty.compareAndSet(State.COMMITTING, State.CLEAN);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
deleted file mode 100644
index 93a3649..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * Tracks the current value (and delta) for a Distribution metric.
- *
- * <p>This class generally shouldn't be used directly. The only exception is 
within a runner where
- * a distribution is being reported for a specific step (rather than the 
distribution in the current
- * context). In that case retrieving the underlying cell and reporting 
directly to it avoids a step
- * of indirection.
- */
-@Experimental(Kind.METRICS)
-public class DistributionCell implements MetricCell<Distribution, 
DistributionData> {
-
-  private final DirtyState dirty = new DirtyState();
-  private final AtomicReference<DistributionData> value =
-      new AtomicReference<>(DistributionData.EMPTY);
-
-  /**
-   * Package-visibility because all {@link DistributionCell DistributionCells} 
should be created by
-   * {@link MetricsContainer#getDistribution(MetricName)}.
-   */
-  DistributionCell() {}
-
-  /** Increment the distribution by the given amount. */
-  public void update(long n) {
-    update(DistributionData.singleton(n));
-  }
-
-  @Override
-  public void update(DistributionData data) {
-    DistributionData original;
-    do {
-      original = value.get();
-    } while (!value.compareAndSet(original, original.combine(data)));
-    dirty.afterModification();
-  }
-
-  @Override
-  public void update(MetricCell<Distribution, DistributionData> other) {
-    update(other.getCumulative());
-  }
-
-  @Override
-  public DirtyState getDirty() {
-    return dirty;
-  }
-
-  @Override
-  public DistributionData getCumulative() {
-    return value.get();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
deleted file mode 100644
index 8068e1b..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-
-/**
- * Data describing the the distribution. This should retain enough detail that 
it can be combined
- * with other {@link DistributionData}.
- *
- * <p>This is kept distinct from {@link DistributionResult} since this may be 
extended to include
- * data necessary to approximate quantiles, etc. while {@link 
DistributionResult} would just include
- * the approximate value of those quantiles.
- */
-@AutoValue
-public abstract class DistributionData implements Serializable {
-
-  public abstract long sum();
-  public abstract long count();
-  public abstract long min();
-  public abstract long max();
-
-  public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, 
Long.MIN_VALUE);
-
-  public static DistributionData create(long sum, long count, long min, long 
max) {
-    return new AutoValue_DistributionData(sum, count, min, max);
-  }
-
-  public static DistributionData singleton(long value) {
-    return create(value, 1, value, value);
-  }
-
-  public DistributionData combine(DistributionData value) {
-    return create(
-        sum() + value.sum(),
-        count() + value.count(),
-        Math.min(value.min(), min()),
-        Math.max(value.max(), max()));
-  }
-
-  public DistributionResult extractResult() {
-    return DistributionResult.create(sum(), count(), min(), max());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
index 27c242c..b01ae46 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
@@ -18,10 +18,13 @@
 package org.apache.beam.sdk.metrics;
 
 import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 
 /**
  * The result of a {@link Distribution} metric.
  */
+@Experimental(Kind.METRICS)
 @AutoValue
 public abstract class DistributionResult {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
deleted file mode 100644
index 0cdd568..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.beam.sdk.annotations.Experimental;
-
-/**
- * Tracks the current value (and delta) for a {@link Gauge} metric.
- *
- * <p>This class generally shouldn't be used directly. The only exception is 
within a runner where
- * a gauge is being reported for a specific step (rather than the gauge in the 
current
- * context). In that case retrieving the underlying cell and reporting 
directly to it avoids a step
- * of indirection.
- */
-@Experimental(Experimental.Kind.METRICS)
-public class GaugeCell implements MetricCell<Gauge, GaugeData> {
-
-  private final DirtyState dirty = new DirtyState();
-  private final AtomicReference<GaugeData> gaugeValue = new 
AtomicReference<>(GaugeData.empty());
-
-  /** Set the gauge to the given value. */
-  public void set(long value) {
-    update(GaugeData.create(value));
-  }
-
-  @Override
-  public void update(GaugeData data) {
-    GaugeData original;
-    do {
-      original = gaugeValue.get();
-    } while (!gaugeValue.compareAndSet(original, original.combine(data)));
-    dirty.afterModification();
-  }
-
-  @Override
-  public void update(MetricCell<Gauge, GaugeData> other) {
-    GaugeData original;
-    do {
-      original = gaugeValue.get();
-    } while (!gaugeValue.compareAndSet(original, 
original.combine(other.getCumulative())));
-    dirty.afterModification();
-    update(other.getCumulative());
-  }
-
-  @Override
-  public DirtyState getDirty() {
-    return dirty;
-  }
-
-  @Override
-  public GaugeData getCumulative() {
-    return gaugeValue.get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java
deleted file mode 100644
index bf3401d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-import org.joda.time.Instant;
-
-/**
- * Data describing the gauge. This should retain enough detail that it can be 
combined with
- * other {@link GaugeData}.
- */
-@AutoValue
-public abstract class GaugeData implements Serializable {
-
-  public abstract long value();
-
-  public abstract Instant timestamp();
-
-  public static GaugeData create(long value) {
-    return new AutoValue_GaugeData(value, Instant.now());
-  }
-
-  public static GaugeData empty() {
-    return EmptyGaugeData.INSTANCE;
-  }
-
-  public GaugeData combine(GaugeData other) {
-    if (this.timestamp().isAfter(other.timestamp())) {
-      return this;
-    } else {
-      return other;
-    }
-  }
-
-  public GaugeResult extractResult() {
-    return GaugeResult.create(value(), timestamp());
-  }
-
-  /**
-   * Empty {@link GaugeData}, representing no values reported.
-   */
-  public static class EmptyGaugeData extends GaugeData {
-
-    private static final EmptyGaugeData INSTANCE = new EmptyGaugeData();
-    private static final Instant EPOCH = new Instant(0);
-
-    private EmptyGaugeData() {
-    }
-
-    @Override
-    public long value() {
-      return -1L;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return EPOCH;
-    }
-
-    @Override
-    public GaugeResult extractResult() {
-      return GaugeResult.empty();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java
index 878776a..f24ded2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java
@@ -18,11 +18,14 @@
 package org.apache.beam.sdk.metrics;
 
 import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.joda.time.Instant;
 
 /**
  * The result of a {@link Gauge} metric.
  */
+@Experimental(Kind.METRICS)
 @AutoValue
 public abstract class GaugeResult {
   public abstract long value();

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
index dcd8a04..fdcc93c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
@@ -18,10 +18,15 @@
 
 package org.apache.beam.sdk.metrics;
 
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
 /**
  * Marker interface for all user-facing metrics.
  */
-public interface Metric {
+@Experimental(Kind.METRICS)
+public interface Metric extends Serializable {
 
   /**
    * The {@link MetricName} given to this metric.

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
deleted file mode 100644
index 403cac2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * A {@link MetricCell} is used for accumulating in-memory changes to a 
metric. It represents a
- * specific metric name in a single context.
- *
- * @param <UserT> The type of the user interface for reporting changes to this 
cell.
- * @param <DataT> The type of metric data stored (and extracted) from this 
cell.
- */
-@Experimental(Kind.METRICS)
-public interface MetricCell<UserT extends Metric, DataT> extends Serializable {
-
-  /**
-   * Update value of this cell.
-   */
-  void update(DataT data);
-
-  /**
-   * Update value of this cell by merging the value of another cell.
-   */
-  void update(MetricCell<UserT, DataT> other);
-
-  /**
-   * Return the {@link DirtyState} tracking whether this metric cell contains 
uncommitted changes.
-   */
-  DirtyState getDirty();
-
-  /**
-   * Return the cumulative value of this metric.
-   */
-  DataT getCumulative();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java
deleted file mode 100644
index a3e43e1..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.metrics;
-
-import com.google.common.base.Objects;
-import java.util.Set;
-
-/**
- * Implements matching for metrics filters. Specifically, matching for metric 
name,
- * namespace, and step name.
- */
-public class MetricFiltering {
-
-  private MetricFiltering() { }
-
-  /** Matching logic is implemented here rather than in MetricsFilter because 
we would like
-   *  MetricsFilter to act as a "dumb" value-object, with the possibility of 
replacing it with
-   *  a Proto/JSON/etc. schema object.
-   * @param filter {@link MetricsFilter} with the matching information of an 
actual metric
-   * @param key {@link MetricKey} with the information of a metric
-   * @return whether the filter matches the key or not
-   */
-  public static boolean matches(MetricsFilter filter, MetricKey key) {
-    return filter == null
-        || (matchesName(key.metricName(), filter.names())
-        && matchesScope(key.stepName(), filter.steps()));
-  }
-
-  /**
-   * {@code subPathMatches(haystack, needle)} returns true if {@code needle}
-   * represents a path within {@code haystack}. For example, "foo/bar" is in 
"a/foo/bar/b",
-   * but not "a/fool/bar/b" or "a/foo/bart/b".
-   */
-  public static boolean subPathMatches(String haystack, String needle) {
-    int location = haystack.indexOf(needle);
-    int end = location + needle.length();
-    if (location == -1) {
-      return false;  // needle not found
-    } else if (location != 0 && haystack.charAt(location - 1) != '/') {
-      return false; // the first entry in needle wasn't exactly matched
-    } else if (end != haystack.length() && haystack.charAt(end) != '/') {
-      return false; // the last entry in needle wasn't exactly matched
-    } else {
-      return true;
-    }
-  }
-
-  /**
-   * {@code matchesScope(actualScope, scopes)} returns true if the scope of a 
metric is matched
-   * by any of the filters in {@code scopes}. A metric scope is a path of type 
"A/B/D". A
-   * path is matched by a filter if the filter is equal to the path (e.g. 
"A/B/D", or
-   * if it represents a subpath within it (e.g. "A/B" or "B/D", but not 
"A/D"). */
-  public static boolean matchesScope(String actualScope, Set<String> scopes) {
-    if (scopes.isEmpty() || scopes.contains(actualScope)) {
-      return true;
-    }
-
-    // If there is no perfect match, a stage name-level match is tried.
-    // This is done by a substring search over the levels of the scope.
-    // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C".
-    for (String scope : scopes) {
-      if (subPathMatches(actualScope, scope)) {
-        return true;
-      }
-    }
-
-    return false;
-  }
-
-  private static boolean matchesName(MetricName metricName, 
Set<MetricNameFilter> nameFilters) {
-    if (nameFilters.isEmpty()) {
-      return true;
-    }
-    for (MetricNameFilter nameFilter : nameFilters) {
-      if ((nameFilter.getName() == null || 
nameFilter.getName().equals(metricName.name()))
-          && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) 
{
-        return true;
-      }
-    }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
deleted file mode 100644
index 8706853..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * Metrics are keyed by the step name they are associated with and the name of 
the metric.
- */
-@Experimental(Kind.METRICS)
-@AutoValue
-public abstract class MetricKey implements Serializable {
-
-  /** The step name that is associated with this metric. */
-  public abstract String stepName();
-
-  /** The name of the metric. */
-  public abstract MetricName metricName();
-
-  public static MetricKey create(String stepName, MetricName metricName) {
-    return new AutoValue_MetricKey(stepName, metricName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
deleted file mode 100644
index 9cf6a5c..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.Iterables;
-import java.io.Serializable;
-import java.util.Collections;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * Representation of multiple metric updates.
- */
-@Experimental(Kind.METRICS)
-@AutoValue
-public abstract class MetricUpdates {
-
-  public static final MetricUpdates EMPTY = MetricUpdates.create(
-      Collections.<MetricUpdate<Long>>emptyList(),
-      Collections.<MetricUpdate<DistributionData>>emptyList(),
-      Collections.<MetricUpdate<GaugeData>>emptyList());
-
-  /**
-   * Representation of a single metric update.
-   * @param <T> The type of value representing the update.
-   */
-  @AutoValue
-  public abstract static class MetricUpdate<T> implements Serializable {
-
-    /** The key being updated. */
-    public abstract MetricKey getKey();
-    /** The value of the update. */
-    public abstract T getUpdate();
-
-    public static <T> MetricUpdate<T> create(MetricKey key, T update) {
-      return new AutoValue_MetricUpdates_MetricUpdate(key, update);
-    }
-  }
-
-  /** Returns true if there are no updates in this MetricUpdates object. */
-  public boolean isEmpty() {
-    return Iterables.isEmpty(counterUpdates())
-        && Iterables.isEmpty(distributionUpdates());
-  }
-
-  /** All of the counter updates. */
-  public abstract Iterable<MetricUpdate<Long>> counterUpdates();
-
-  /** All of the distribution updates. */
-  public abstract Iterable<MetricUpdate<DistributionData>> 
distributionUpdates();
-
-  /** All of the gauges updates. */
-  public abstract Iterable<MetricUpdate<GaugeData>> gaugeUpdates();
-
-  /** Create a new {@link MetricUpdates} bundle. */
-  public static MetricUpdates create(
-      Iterable<MetricUpdate<Long>> counterUpdates,
-      Iterable<MetricUpdate<DistributionData>> distributionUpdates,
-      Iterable<MetricUpdate<GaugeData>> gaugeUpdates) {
-    return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates, 
gaugeUpdates);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 096d147..bad1f10 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -102,7 +102,7 @@ public class Metrics {
   }
 
   /** Implementation of {@link Counter} that delegates to the instance for the 
current context. */
-  private static class DelegatingCounter implements Counter, Serializable {
+  private static class DelegatingCounter implements Metric, Counter, 
Serializable {
     private final MetricName name;
 
     private DelegatingCounter(MetricName name) {
@@ -118,7 +118,7 @@ public class Metrics {
     @Override public void inc(long n) {
       MetricsContainer container = MetricsEnvironment.getCurrentContainer();
       if (container != null) {
-        container.getCounter(name).update(n);
+        container.getCounter(name).inc(n);
       }
     }
 
@@ -140,7 +140,7 @@ public class Metrics {
   /**
    * Implementation of {@link Distribution} that delegates to the instance for 
the current context.
    */
-  private static class DelegatingDistribution implements Distribution, 
Serializable {
+  private static class DelegatingDistribution implements Metric, Distribution, 
Serializable {
     private final MetricName name;
 
     private DelegatingDistribution(MetricName name) {
@@ -163,7 +163,7 @@ public class Metrics {
   /**
    * Implementation of {@link Gauge} that delegates to the instance for the 
current context.
    */
-  private static class DelegatingGauge implements Gauge, Serializable {
+  private static class DelegatingGauge implements Metric, Gauge, Serializable {
     private final MetricName name;
 
     private DelegatingGauge(MetricName name) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index 48fa359..62b0806 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -15,164 +15,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.metrics;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+package org.apache.beam.sdk.metrics;
 
-import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
-import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
 
 /**
  * Holds the metrics for a single step and unit-of-commit (bundle).
- *
- * <p>This class is thread-safe. It is intended to be used with 1 (or more) 
threads are updating
- * metrics and at-most 1 thread is extracting updates by calling {@link 
#getUpdates} and
- * {@link #commitUpdates}. Outside of this it is still safe. Although races in 
the update extraction
- * may cause updates that don't actually have any changes, it will never lose 
an update.
- *
- * <p>For consistency, all threads that update metrics should finish before 
getting the final
- * cumulative values/updates.
  */
 @Experimental(Kind.METRICS)
-public class MetricsContainer implements Serializable {
-
-  private final String stepName;
-
-  private MetricsMap<MetricName, CounterCell> counters =
-      new MetricsMap<>(new MetricsMap.Factory<MetricName, CounterCell>() {
-        @Override
-        public CounterCell createInstance(MetricName unusedKey) {
-          return new CounterCell();
-        }
-      });
-
-  private MetricsMap<MetricName, DistributionCell> distributions =
-      new MetricsMap<>(new MetricsMap.Factory<MetricName, DistributionCell>() {
-        @Override
-        public DistributionCell createInstance(MetricName unusedKey) {
-          return new DistributionCell();
-        }
-      });
-
-  private MetricsMap<MetricName, GaugeCell> gauges =
-      new MetricsMap<>(new MetricsMap.Factory<MetricName, GaugeCell>() {
-        @Override
-        public GaugeCell createInstance(MetricName unusedKey) {
-          return new GaugeCell();
-        }
-      });
-
-  /**
-   * Create a new {@link MetricsContainer} associated with the given {@code 
stepName}.
-   */
-  public MetricsContainer(String stepName) {
-    this.stepName = stepName;
-  }
+public interface MetricsContainer extends Serializable {
 
   /**
-   * Return the {@link CounterCell} that should be used for implementing the 
given
+   * Return the {@link Counter} that should be used for implementing the given
    * {@code metricName} in this container.
    */
-  public CounterCell getCounter(MetricName metricName) {
-    return counters.get(metricName);
-  }
+  Counter getCounter(MetricName metricName);
 
   /**
-   * Return the {@link DistributionCell} that should be used for implementing 
the given
+   * Return the {@link Distribution} that should be used for implementing the 
given
    * {@code metricName} in this container.
    */
-  public DistributionCell getDistribution(MetricName metricName) {
-    return distributions.get(metricName);
-  }
+  Distribution getDistribution(MetricName metricName);
 
   /**
-   * Return the {@link GaugeCell} that should be used for implementing the 
given
+   * Return the {@link Gauge} that should be used for implementing the given
    * {@code metricName} in this container.
    */
-  public GaugeCell getGauge(MetricName metricName) {
-    return gauges.get(metricName);
-  }
-
-  private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, 
UpdateT>>
-  ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
-      MetricsMap<MetricName, CellT> cells) {
-    ImmutableList.Builder<MetricUpdate<UpdateT>> updates = 
ImmutableList.builder();
-    for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
-      if (cell.getValue().getDirty().beforeCommit()) {
-        updates.add(MetricUpdate.create(MetricKey.create(stepName, 
cell.getKey()),
-            cell.getValue().getCumulative()));
-      }
-    }
-    return updates.build();
-  }
-
-  /**
-   * Return the cumulative values for any metrics that have changed since the 
last time updates were
-   * committed.
-   */
-  public MetricUpdates getUpdates() {
-    return MetricUpdates.create(
-        extractUpdates(counters),
-        extractUpdates(distributions),
-        extractUpdates(gauges));
-  }
-
-  private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, 
?>> cells) {
-    for (MetricCell<?, ?> cell : cells.values()) {
-      cell.getDirty().afterCommit();
-    }
-  }
-
-  /**
-   * Mark all of the updates that were retrieved with the latest call to 
{@link #getUpdates()} as
-   * committed.
-   */
-  public void commitUpdates() {
-    commitUpdates(counters);
-    commitUpdates(distributions);
-    commitUpdates(gauges);
-  }
-
-  private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, 
UpdateT>>
-  ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
-      MetricsMap<MetricName, CellT> cells) {
-    ImmutableList.Builder<MetricUpdate<UpdateT>> updates = 
ImmutableList.builder();
-    for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
-      UpdateT update = checkNotNull(cell.getValue().getCumulative());
-      updates.add(MetricUpdate.create(MetricKey.create(stepName, 
cell.getKey()), update));
-    }
-    return updates.build();
-  }
-
-  /**
-   * Return the {@link MetricUpdates} representing the cumulative values of 
all metrics in this
-   * container.
-   */
-  public MetricUpdates getCumulative() {
-    return MetricUpdates.create(
-        extractCumulatives(counters),
-        extractCumulatives(distributions),
-        extractCumulatives(gauges));
-  }
-
-  /**
-   * Update values of this {@link MetricsContainer} by merging the value of 
another cell.
-   */
-  public void update(MetricsContainer other) {
-    updateCells(counters, other.counters);
-    updateCells(distributions, other.distributions);
-    updateCells(gauges, other.gauges);
-  }
-
-  private <UserT extends Metric, DataT, CellT extends MetricCell<UserT, 
DataT>> void updateCells(
-      MetricsMap<MetricName, CellT> current,
-      MetricsMap<MetricName, CellT> updates) {
-    for (Map.Entry<MetricName, CellT> counter : updates.entries()) {
-      current.get(counter.getKey()).update(counter.getValue());
-    }
-  }
+  Gauge getGauge(MetricName metricName);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
deleted file mode 100644
index d01e970..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
+++ /dev/null
@@ -1,487 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
-
-/**
- * Metrics containers by step.
- *
- * <p>This class is not thread-safe.</p>
- */
-public class MetricsContainerStepMap implements Serializable {
-  private Map<String, MetricsContainer> metricsContainers;
-
-  public MetricsContainerStepMap() {
-    this.metricsContainers = new ConcurrentHashMap<>();
-  }
-
-  /**
-   * Returns the container for the given step name.
-   */
-  public MetricsContainer getContainer(String stepName) {
-    if (!metricsContainers.containsKey(stepName)) {
-      metricsContainers.put(stepName, new MetricsContainer(stepName));
-    }
-    return metricsContainers.get(stepName);
-  }
-
-  /**
-   * Update this {@link MetricsContainerStepMap} with all values from given
-   * {@link MetricsContainerStepMap}.
-   */
-  public void updateAll(MetricsContainerStepMap other) {
-    for (Map.Entry<String, MetricsContainer> container : 
other.metricsContainers.entrySet()) {
-      getContainer(container.getKey()).update(container.getValue());
-    }
-  }
-
-  /**
-   * Update {@link MetricsContainer} for given step in this map with all 
values from given
-   * {@link MetricsContainer}.
-   */
-  public void update(String step, MetricsContainer container) {
-    getContainer(step).update(container);
-  }
-
-  /**
-   * Returns {@link MetricResults} based on given
-   * {@link MetricsContainerStepMap MetricsContainerStepMaps} of attempted and 
committed metrics.
-   *
-   * <p>This constructor is intended for runners which support both attempted 
and committed
-   * metrics.
-   */
-  public static MetricResults asMetricResults(
-      MetricsContainerStepMap attemptedMetricsContainers,
-      MetricsContainerStepMap committedMetricsContainers) {
-    return new MetricsContainerStepMapMetricResults(
-        attemptedMetricsContainers,
-        committedMetricsContainers);
-  }
-
-  /**
-   * Returns {@link MetricResults} based on given {@link 
MetricsContainerStepMap} of attempted
-   * metrics.
-   *
-   * <p>This constructor is intended for runners which only support 
`attempted` metrics.
-   * Accessing {@link MetricResult#committed()} in the resulting {@link 
MetricResults} will result
-   * in an {@link UnsupportedOperationException}.</p>
-   */
-  public static MetricResults asAttemptedOnlyMetricResults(
-      MetricsContainerStepMap attemptedMetricsContainers) {
-    return new 
MetricsContainerStepMapMetricResults(attemptedMetricsContainers);
-  }
-
-  private Map<String, MetricsContainer> getMetricsContainers() {
-    return metricsContainers;
-  }
-
-  private static class MetricsContainerStepMapMetricResults extends 
MetricResults {
-    private final Map<MetricKey, AttemptedAndCommitted<Long>> counters = new 
HashMap<>();
-    private final Map<MetricKey, AttemptedAndCommitted<DistributionData>> 
distributions =
-        new HashMap<>();
-    private final Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges = 
new HashMap<>();
-    private final boolean isCommittedSupported;
-
-    private MetricsContainerStepMapMetricResults(
-        MetricsContainerStepMap attemptedMetricsContainers) {
-      this(attemptedMetricsContainers, new MetricsContainerStepMap(), false);
-    }
-
-    private MetricsContainerStepMapMetricResults(
-        MetricsContainerStepMap attemptedMetricsContainers,
-        MetricsContainerStepMap committedMetricsContainers) {
-      this(attemptedMetricsContainers, committedMetricsContainers, true);
-    }
-
-    private MetricsContainerStepMapMetricResults(
-        MetricsContainerStepMap attemptedMetricsContainers,
-        MetricsContainerStepMap committedMetricsContainers,
-        boolean isCommittedSupported) {
-      for (MetricsContainer container
-          : attemptedMetricsContainers.getMetricsContainers().values()) {
-        MetricUpdates cumulative = container.getCumulative();
-        mergeCounters(counters, cumulative.counterUpdates(), 
attemptedCounterUpdateFn());
-        mergeDistributions(distributions, cumulative.distributionUpdates(),
-            attemptedDistributionUpdateFn());
-        mergeGauges(gauges, cumulative.gaugeUpdates(), 
attemptedGaugeUpdateFn());
-      }
-      for (MetricsContainer container
-          : committedMetricsContainers.getMetricsContainers().values()) {
-        MetricUpdates cumulative = container.getCumulative();
-        mergeCounters(counters, cumulative.counterUpdates(), 
committedCounterUpdateFn());
-        mergeDistributions(distributions, cumulative.distributionUpdates(),
-            committedDistributionUpdateFn());
-        mergeGauges(gauges, cumulative.gaugeUpdates(), 
committedGaugeUpdateFn());
-      }
-      this.isCommittedSupported = isCommittedSupported;
-    }
-
-    private Function<MetricUpdate<DistributionData>, 
AttemptedAndCommitted<DistributionData>>
-    attemptedDistributionUpdateFn() {
-      return new Function<MetricUpdate<DistributionData>,
-          AttemptedAndCommitted<DistributionData>>() {
-        @Override
-        public AttemptedAndCommitted<DistributionData> 
apply(MetricUpdate<DistributionData> input) {
-          MetricKey key = input.getKey();
-          return new AttemptedAndCommitted<>(
-              key,
-              input,
-              MetricUpdate.create(key, DistributionData.EMPTY));
-        }
-      };
-    }
-
-    private Function<MetricUpdate<DistributionData>, 
AttemptedAndCommitted<DistributionData>>
-    committedDistributionUpdateFn() {
-      return new Function<MetricUpdate<DistributionData>,
-          AttemptedAndCommitted<DistributionData>>() {
-        @Override
-        public AttemptedAndCommitted<DistributionData> 
apply(MetricUpdate<DistributionData> input) {
-          MetricKey key = input.getKey();
-          return new AttemptedAndCommitted<>(
-              key,
-              MetricUpdate.create(key, DistributionData.EMPTY),
-              input);
-        }
-      };
-    }
-
-    private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
-    attemptedGaugeUpdateFn() {
-      return new Function<MetricUpdate<GaugeData>, 
AttemptedAndCommitted<GaugeData>>() {
-        @Override
-        public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> 
input) {
-          MetricKey key = input.getKey();
-          return new AttemptedAndCommitted<>(
-              key,
-              input,
-              MetricUpdate.create(key, GaugeData.empty()));
-        }
-      };
-    }
-
-    private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
-    committedGaugeUpdateFn() {
-      return new Function<MetricUpdate<GaugeData>, 
AttemptedAndCommitted<GaugeData>>() {
-        @Override
-        public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> 
input) {
-          MetricKey key = input.getKey();
-          return new AttemptedAndCommitted<>(
-              key,
-              MetricUpdate.create(key, GaugeData.empty()),
-              input);
-        }
-      };
-    }
-
-    private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> 
attemptedCounterUpdateFn() {
-      return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
-        @Override
-        public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
-          MetricKey key = input.getKey();
-          return new AttemptedAndCommitted<>(
-              key,
-              input,
-              MetricUpdate.create(key, 0L));
-        }
-      };
-    }
-
-    private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> 
committedCounterUpdateFn() {
-      return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
-        @Override
-        public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
-          MetricKey key = input.getKey();
-          return new AttemptedAndCommitted<>(
-              key,
-              MetricUpdate.create(key, 0L),
-              input);
-        }
-      };
-    }
-
-    @Override
-    public MetricQueryResults queryMetrics(MetricsFilter filter) {
-      return new QueryResults(filter);
-    }
-
-    private class QueryResults implements MetricQueryResults {
-      private final MetricsFilter filter;
-
-      private QueryResults(MetricsFilter filter) {
-        this.filter = filter;
-      }
-
-      @Override
-      public Iterable<MetricResult<Long>> counters() {
-        return
-            FluentIterable
-                .from(counters.values())
-                .filter(matchesFilter(filter))
-                .transform(counterUpdateToResult())
-                .toList();
-      }
-
-      @Override
-      public Iterable<MetricResult<DistributionResult>> distributions() {
-        return
-            FluentIterable
-                .from(distributions.values())
-                .filter(matchesFilter(filter))
-                .transform(distributionUpdateToResult())
-                .toList();
-      }
-
-      @Override
-      public Iterable<MetricResult<GaugeResult>> gauges() {
-        return
-            FluentIterable
-                .from(gauges.values())
-                .filter(matchesFilter(filter))
-                .transform(gaugeUpdateToResult())
-                .toList();
-      }
-
-      private Predicate<AttemptedAndCommitted<?>> matchesFilter(final 
MetricsFilter filter) {
-        return new Predicate<AttemptedAndCommitted<?>>() {
-          @Override
-          public boolean apply(AttemptedAndCommitted<?> attemptedAndCommitted) 
{
-            return MetricFiltering.matches(filter, 
attemptedAndCommitted.getKey());
-          }
-        };
-      }
-    }
-
-    private Function<AttemptedAndCommitted<Long>, MetricResult<Long>> 
counterUpdateToResult() {
-      return new
-          Function<AttemptedAndCommitted<Long>, MetricResult<Long>>() {
-            @Override
-            public MetricResult<Long>
-            apply(AttemptedAndCommitted<Long> metricResult) {
-              MetricKey key = metricResult.getKey();
-              return new AccumulatedMetricResult<>(
-                  key.metricName(),
-                  key.stepName(),
-                  metricResult.getAttempted().getUpdate(),
-                  isCommittedSupported
-                      ? metricResult.getCommitted().getUpdate()
-                      : null,
-                  isCommittedSupported);
-            }
-          };
-    }
-
-    private Function<AttemptedAndCommitted<DistributionData>, 
MetricResult<DistributionResult>>
-    distributionUpdateToResult() {
-      return new
-          Function<AttemptedAndCommitted<DistributionData>, 
MetricResult<DistributionResult>>() {
-            @Override
-            public MetricResult<DistributionResult>
-            apply(AttemptedAndCommitted<DistributionData> metricResult) {
-              MetricKey key = metricResult.getKey();
-              return new AccumulatedMetricResult<>(
-                  key.metricName(),
-                  key.stepName(),
-                  metricResult.getAttempted().getUpdate().extractResult(),
-                  isCommittedSupported
-                      ? metricResult.getCommitted().getUpdate().extractResult()
-                      : null,
-                  isCommittedSupported);
-            }
-          };
-    }
-
-    private Function<AttemptedAndCommitted<GaugeData>, 
MetricResult<GaugeResult>>
-    gaugeUpdateToResult() {
-      return new
-          Function<AttemptedAndCommitted<GaugeData>, 
MetricResult<GaugeResult>>() {
-            @Override
-            public MetricResult<GaugeResult>
-            apply(AttemptedAndCommitted<GaugeData> metricResult) {
-              MetricKey key = metricResult.getKey();
-              return new AccumulatedMetricResult<>(
-                  key.metricName(),
-                  key.stepName(),
-                  metricResult.getAttempted().getUpdate().extractResult(),
-                  isCommittedSupported
-                      ? metricResult.getCommitted().getUpdate().extractResult()
-                      : null,
-                  isCommittedSupported);
-            }
-          };
-    }
-
-    @SuppressWarnings("ConstantConditions")
-    private void mergeCounters(
-        Map<MetricKey, AttemptedAndCommitted<Long>> counters,
-        Iterable<MetricUpdate<Long>> updates,
-        Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> 
updateToAttemptedAndCommittedFn) {
-      for (MetricUpdate<Long> metricUpdate : updates) {
-        MetricKey key = metricUpdate.getKey();
-        AttemptedAndCommitted<Long> update =
-            updateToAttemptedAndCommittedFn.apply(metricUpdate);
-        if (counters.containsKey(key)) {
-          AttemptedAndCommitted<Long> current = counters.get(key);
-          update = new AttemptedAndCommitted<>(
-              key,
-              MetricUpdate.create(
-                  key,
-                  update.getAttempted().getUpdate() + 
current.getAttempted().getUpdate()),
-              MetricUpdate.create(
-                  key,
-                  update.getCommitted().getUpdate() + 
current.getCommitted().getUpdate()));
-        }
-        counters.put(key, update);
-      }
-    }
-
-    @SuppressWarnings("ConstantConditions")
-    private void mergeDistributions(
-        Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions,
-        Iterable<MetricUpdate<DistributionData>> updates,
-        Function<MetricUpdate<DistributionData>, 
AttemptedAndCommitted<DistributionData>>
-            updateToAttemptedAndCommittedFn) {
-      for (MetricUpdate<DistributionData> metricUpdate : updates) {
-        MetricKey key = metricUpdate.getKey();
-        AttemptedAndCommitted<DistributionData> update =
-            updateToAttemptedAndCommittedFn.apply(metricUpdate);
-        if (distributions.containsKey(key)) {
-          AttemptedAndCommitted<DistributionData> current = 
distributions.get(key);
-          update = new AttemptedAndCommitted<>(
-              key,
-              MetricUpdate.create(
-                  key,
-                  
update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
-              MetricUpdate.create(
-                  key,
-                  
update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
-        }
-        distributions.put(key, update);
-      }
-    }
-
-    @SuppressWarnings("ConstantConditions")
-    private void mergeGauges(
-        Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges,
-        Iterable<MetricUpdate<GaugeData>> updates,
-        Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
-            updateToAttemptedAndCommittedFn) {
-      for (MetricUpdate<GaugeData> metricUpdate : updates) {
-        MetricKey key = metricUpdate.getKey();
-        AttemptedAndCommitted<GaugeData> update =
-            updateToAttemptedAndCommittedFn.apply(metricUpdate);
-        if (gauges.containsKey(key)) {
-          AttemptedAndCommitted<GaugeData> current = gauges.get(key);
-          update = new AttemptedAndCommitted<>(
-              key,
-              MetricUpdate.create(
-                  key,
-                  
update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
-              MetricUpdate.create(
-                  key,
-                  
update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
-        }
-        gauges.put(key, update);
-      }
-    }
-
-    /**
-     * Accumulated implementation of {@link MetricResult}.
-     */
-    private static class AccumulatedMetricResult<T> implements MetricResult<T> 
{
-      private final MetricName name;
-      private final String step;
-      private final T attempted;
-      private final T committed;
-      private final boolean isCommittedSupported;
-
-      private AccumulatedMetricResult(
-          MetricName name,
-          String step,
-          T attempted,
-          T committed,
-          boolean isCommittedSupported) {
-        this.name = name;
-        this.step = step;
-        this.attempted = attempted;
-        this.committed = committed;
-        this.isCommittedSupported = isCommittedSupported;
-      }
-
-      @Override
-      public MetricName name() {
-        return name;
-      }
-
-      @Override
-      public String step() {
-        return step;
-      }
-
-      @Override
-      public T committed() {
-        if (!isCommittedSupported) {
-          throw new UnsupportedOperationException("This runner does not 
currently support committed"
-              + " metrics results. Please use 'attempted' instead.");
-        }
-        return committed;
-      }
-
-      @Override
-      public T attempted() {
-        return attempted;
-      }
-    }
-
-    /**
-     * Attempted and committed {@link MetricUpdate MetricUpdates}.
-     */
-    private static class AttemptedAndCommitted<T> {
-      private final MetricKey key;
-      private final MetricUpdate<T> attempted;
-      private final MetricUpdate<T> committed;
-
-      private AttemptedAndCommitted(MetricKey key, MetricUpdate<T> attempted,
-          MetricUpdate<T> committed) {
-        this.key = key;
-        this.attempted = attempted;
-        this.committed = committed;
-      }
-
-      private MetricKey getKey() {
-        return key;
-      }
-
-      private MetricUpdate<T> getAttempted() {
-        return attempted;
-      }
-
-      private MetricUpdate<T> getCommitted() {
-        return committed;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index a4b311f..dadec33 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -21,6 +21,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,7 +33,7 @@ import org.slf4j.LoggerFactory;
  * <p>Users should not interact directly with this class. Instead, use {@link 
Metrics} and the
  * returned objects to create and modify metrics.
  *
- * <p>The runner should create {@link MetricsContainer} for each context in 
which metrics are
+ * <p>The runner should create a {@link MetricsContainer} for each context in 
which metrics are
  * reported (by step and name) and call {@link #setCurrentContainer} before 
invoking any code that
  * may update metrics within that step. It should call {@link 
#setCurrentContainer} again to restore
  * the previous container.
@@ -39,9 +42,11 @@ import org.slf4j.LoggerFactory;
  * container for the current thread and get a {@link Closeable} that will 
restore the previous
  * container when closed.
  */
+@Experimental(Kind.METRICS)
+@Internal
 public class MetricsEnvironment {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(MetricsContainer.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetricsEnvironment.class);
 
   private static final AtomicBoolean METRICS_SUPPORTED = new 
AtomicBoolean(false);
   private static final AtomicBoolean REPORTED_MISSING_CONTAINER = new 
AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
deleted file mode 100644
index 8c26f18..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Iterables;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/**
- * A map from {@code K} to {@code T} that supports getting or creating values 
associated with a key
- * in a thread-safe manner.
- */
-@Experimental(Kind.METRICS)
-public class MetricsMap<K, T> implements Serializable {
-
-  /** Interface for creating instances to populate the {@link MetricsMap}. */
-  public interface Factory<K, T> extends Serializable {
-    /**
-     * Create an instance of {@code T} to use with the given {@code key}.
-     *
-     * <p>It must be safe to call this from multiple threads.
-     */
-    T createInstance(K key);
-  }
-
-  private final Factory<K, T> factory;
-  private final ConcurrentMap<K, T> metrics = new ConcurrentHashMap<>();
-
-  public MetricsMap(Factory<K, T> factory) {
-    this.factory = factory;
-  }
-
-  /**
-   * Get or create the value associated with the given key.
-   */
-  public T get(K key) {
-    T metric = metrics.get(key);
-    if (metric == null) {
-      metric = factory.createInstance(key);
-      metric = MoreObjects.firstNonNull(metrics.putIfAbsent(key, metric), 
metric);
-    }
-    return metric;
-  }
-
-  /**
-   * Get the value associated with the given key, if it exists.
-   */
-  @Nullable
-  public T tryGet(K key) {
-    return metrics.get(key);
-  }
-
-  /**
-   * Return an iterable over the entries in the current {@link  MetricsMap}.
-   */
-  public Iterable<Map.Entry<K, T>> entries() {
-    return Iterables.unmodifiableIterable(metrics.entrySet());
-  }
-
-  /**
-   * Return an iterable over the values in the current {@link MetricsMap}.
-   */
-  public Iterable<T> values() {
-    return Iterables.unmodifiableIterable(metrics.values());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java
index f96b6ac..f6f00bd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java
@@ -17,9 +17,13 @@
  */
 package org.apache.beam.sdk.metrics;
 
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
 /**
  * Standard Sink Metrics.
  */
+@Experimental(Kind.METRICS)
 public class SinkMetrics {
 
   private static final String SINK_NAMESPACE = "sink";

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java
index 4479f3a..1adbf0c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java
@@ -18,10 +18,13 @@
 package org.apache.beam.sdk.metrics;
 
 import com.google.common.base.Joiner;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 
 /**
  * Standard {@link org.apache.beam.sdk.io.Source} Metrics.
  */
+@Experimental(Kind.METRICS)
 public class SourceMetrics {
 
   private static final String SOURCE_NAMESPACE = "source";

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
deleted file mode 100644
index 26554d4..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.metrics;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link CounterCell}.
- */
-@RunWith(JUnit4.class)
-public class CounterCellTest {
-
-  private CounterCell cell = new CounterCell();
-
-  @Test
-  public void testDeltaAndCumulative() {
-    cell.update(5);
-    cell.update(7);
-    assertThat(cell.getCumulative(), equalTo(12L));
-    assertThat("getCumulative is idempotent", cell.getCumulative(), 
equalTo(12L));
-
-    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
-    cell.getDirty().afterCommit();
-    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
-    assertThat(cell.getCumulative(), equalTo(12L));
-
-    cell.update(30);
-    assertThat(cell.getCumulative(), equalTo(42L));
-
-    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
-    cell.getDirty().afterCommit();
-    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
deleted file mode 100644
index d00f8cd..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.metrics;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link DirtyStateTest}.
- */
-@RunWith(JUnit4.class)
-public class DirtyStateTest {
-
-  private final DirtyState dirty = new DirtyState();
-
-  @Test
-  public void basicPath() {
-    assertThat("Should start dirty", dirty.beforeCommit(), is(true));
-    dirty.afterCommit();
-    assertThat("Should be clean after commit", dirty.beforeCommit(), 
is(false));
-
-    dirty.afterModification();
-    assertThat("Should be dirty after change", dirty.beforeCommit(), is(true));
-    dirty.afterCommit();
-    assertThat("Should be clean after commit", dirty.beforeCommit(), 
is(false));
-  }
-
-  @Test
-  public void changeAfterBeforeCommit() {
-    assertThat("Should start dirty", dirty.beforeCommit(), is(true));
-    dirty.afterModification();
-    dirty.afterCommit();
-    assertThat("Changes after beforeCommit should be dirty after afterCommit",
-        dirty.beforeCommit(), is(true));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
deleted file mode 100644
index 07e0b26..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.metrics;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link DistributionCell}.
- */
-@RunWith(JUnit4.class)
-public class DistributionCellTest {
-  private DistributionCell cell = new DistributionCell();
-
-  @Test
-  public void testDeltaAndCumulative() {
-    cell.update(5);
-    cell.update(7);
-    assertThat(cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 
7)));
-    assertThat("getCumulative is idempotent",
-        cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7)));
-
-    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
-    cell.getDirty().afterCommit();
-    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
-
-    cell.update(30);
-    assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 5, 
30)));
-
-    assertThat("Adding a new value made the cell dirty",
-        cell.getDirty().beforeCommit(), equalTo(true));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java
deleted file mode 100644
index 7dbfc4b..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-
-/**
- * Tests for {@link GaugeCell}.
- */
-public class GaugeCellTest {
-  private GaugeCell cell = new GaugeCell();
-
-  @Test
-  public void testDeltaAndCumulative() {
-    cell.set(5);
-    cell.set(7);
-    assertThat(cell.getCumulative().value(), 
equalTo(GaugeData.create(7).value()));
-    assertThat("getCumulative is idempotent",
-        cell.getCumulative().value(), equalTo(7L));
-
-    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
-    cell.getDirty().afterCommit();
-    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
-
-    cell.set(30);
-    assertThat(cell.getCumulative().value(), equalTo(30L));
-
-    assertThat("Adding a new value made the cell dirty",
-        cell.getDirty().beforeCommit(), equalTo(true));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
deleted file mode 100644
index dc2fa0a..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.metrics;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashSet;
-import java.util.Set;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link MetricFiltering}.
- */
-@RunWith(JUnit4.class)
-public class MetricFilteringTest {
-  private static final MetricName NAME1 = MetricName.named("ns1", "name1");
-
-
-  private boolean matchesSubPath(String actualScope, String subPath) {
-    return MetricFiltering.subPathMatches(actualScope, subPath);
-  }
-
-  @Test
-  public void testMatchCompositeStepNameFilters() {
-    // MetricsFilter with a Class-namespace + name filter + step filter.
-    // Successful match.
-    assertTrue(MetricFiltering.matches(
-        MetricsFilter.builder().addNameFilter(
-            MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
-            .addStep("myStep").build(),
-        MetricKey.create(
-            "myBigStep/myStep", MetricName.named(MetricFilteringTest.class, 
"myMetricName"))));
-
-    // Unsuccessful match.
-    assertFalse(MetricFiltering.matches(
-        MetricsFilter.builder().addNameFilter(
-            MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
-            .addStep("myOtherStep").build(),
-        MetricKey.create(
-            "myOtherStepNoMatch/myStep",
-            MetricName.named(MetricFilteringTest.class, "myMetricName"))));
-  }
-
-  @Test
-  public void testMatchStepNameFilters() {
-    // MetricsFilter with a Class-namespace + name filter + step filter.
-    // Successful match.
-    assertTrue(MetricFiltering.matches(
-        MetricsFilter.builder().addNameFilter(
-            MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
-        .addStep("myStep").build(),
-        MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class, 
"myMetricName"))));
-
-    // Unsuccessful match.
-    assertFalse(MetricFiltering.matches(
-        MetricsFilter.builder().addNameFilter(
-            MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
-        .addStep("myOtherStep").build(),
-        MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class, 
"myMetricName"))));
-  }
-
-  @Test
-  public void testMatchClassNamespaceFilters() {
-    // MetricsFilter with a Class-namespace + name filter. Without step filter.
-    // Successful match.
-    assertTrue(MetricFiltering.matches(
-        MetricsFilter.builder().addNameFilter(
-            MetricNameFilter.named(MetricFilteringTest.class, 
"myMetricName")).build(),
-        MetricKey.create("anyStep", 
MetricName.named(MetricFilteringTest.class, "myMetricName"))));
-
-    // Unsuccessful match.
-    assertFalse(MetricFiltering.matches(
-        MetricsFilter.builder().addNameFilter(
-            MetricNameFilter.named(MetricFilteringTest.class, 
"myMetricName")).build(),
-        MetricKey.create("anyStep", MetricName.named(MetricFiltering.class, 
"myMetricName"))));
-  }
-
-  @Test
-  public void testMatchStringNamespaceFilters() {
-    // MetricsFilter with a String-namespace + name filter. Without step 
filter.
-    // Successful match.
-    assertTrue(
-        MetricFiltering.matches(
-            MetricsFilter.builder().addNameFilter(
-                MetricNameFilter.named("myNamespace", "myMetricName")).build(),
-            MetricKey.create("anyStep", MetricName.named("myNamespace", 
"myMetricName"))));
-
-    // Unsuccessful match.
-    assertFalse(
-        MetricFiltering.matches(
-            MetricsFilter.builder().addNameFilter(
-                MetricNameFilter.named("myOtherNamespace", 
"myMetricName")).build(),
-            MetricKey.create("anyStep", MetricName.named("myNamespace", 
"myMetricname"))));
-  }
-
-  @Test
-  public void testMatchesSubPath() {
-    assertTrue("Match of the first element",
-        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1"));
-    assertTrue("Match of the first elements",
-        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
-    assertTrue("Match of the last elements",
-        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1"));
-    assertFalse("Substring match but no subpath match",
-        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1"));
-    assertFalse("Substring match from start - but no subpath match",
-        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top"));
-  }
-
-  private boolean matchesScopeWithSingleFilter(String actualScope, String 
filter) {
-    Set<String> scopeFilter = new HashSet<String>();
-    scopeFilter.add(filter);
-    return MetricFiltering.matchesScope(actualScope, scopeFilter);
-  }
-
-  @Test
-  public void testMatchesScope() {
-    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", 
"Top1"));
-    assertTrue(matchesScopeWithSingleFilter(
-        "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1"));
-    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", 
"Top1/Outer1"));
-    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", 
"Top1/Outer1/Inner1"));
-    assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", 
"Top1/Inner1"));
-    assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", 
"Top1/Outer1/Inn"));
-  }
-}

Reply via email to