Repository: beam
Updated Branches:
  refs/heads/master f3cff3695 -> 30dbaf891


[BEAM-1958] Standard IO Metrics in Java SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/41d52be0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/41d52be0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/41d52be0

Branch: refs/heads/master
Commit: 41d52be0ec64c83a79d97bfd3c27eb104b546991
Parents: f3cff36
Author: Aviem Zur <aviem...@gmail.com>
Authored: Thu Apr 13 20:27:33 2017 +0300
Committer: Aviem Zur <aviem...@gmail.com>
Committed: Wed Apr 26 19:53:29 2017 +0300

----------------------------------------------------------------------
 .../streaming/StreamingSourceMetricsTest.java   |  12 +-
 .../org/apache/beam/sdk/io/CountingSource.java  |   6 +-
 .../apache/beam/sdk/metrics/CounterCell.java    |  19 +--
 .../beam/sdk/metrics/DistributionCell.java      |  10 +-
 .../org/apache/beam/sdk/metrics/GaugeCell.java  |   8 +-
 .../org/apache/beam/sdk/metrics/Metric.java     |   8 +-
 .../org/apache/beam/sdk/metrics/MetricCell.java |   8 +-
 .../org/apache/beam/sdk/metrics/Metrics.java    |  12 ++
 .../beam/sdk/metrics/MetricsContainer.java      |   8 +-
 .../apache/beam/sdk/metrics/SinkMetrics.java    |  49 ++++++++
 .../apache/beam/sdk/metrics/SourceMetrics.java  | 116 +++++++++++++++++++
 .../apache/beam/sdk/metrics/MetricsTest.java    |  19 ++-
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |   4 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |   5 +-
 .../io/gcp/pubsub/PubsubUnboundedSource.java    |   4 +-
 15 files changed, 228 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
index 80f7f53..5a4b1b5 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
@@ -29,9 +29,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
@@ -41,6 +43,7 @@ import org.junit.Test;
  * Verify metrics support for {@link Source Sources} in streaming pipelines.
  */
 public class StreamingSourceMetricsTest implements Serializable {
+  private static final MetricName ELEMENTS_READ = 
SourceMetrics.elementsRead().getName();
 
   // Force streaming pipeline using pipeline rule.
   @Rule
@@ -65,10 +68,15 @@ public class StreamingSourceMetricsTest implements 
Serializable {
             .metrics()
             .queryMetrics(
                 MetricsFilter.builder()
-                    .addNameFilter(MetricNameFilter.named("io", 
"elementsRead"))
+                    .addNameFilter(
+                        MetricNameFilter.named(ELEMENTS_READ.namespace(), 
ELEMENTS_READ.name()))
                     .build());
 
     assertThat(metrics.counters(), hasItem(
-        attemptedMetricsResult("io", "elementsRead", 
"Read(UnboundedCountingSource)", 1000L)));
+        attemptedMetricsResult(
+            ELEMENTS_READ.namespace(),
+            ELEMENTS_READ.name(),
+            "Read(UnboundedCountingSource)",
+            1000L)));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index b66a8b2..81082e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
@@ -190,7 +190,7 @@ public class CountingSource {
   private static class BoundedCountingReader extends 
OffsetBasedSource.OffsetBasedReader<Long> {
     private long current;
 
-    private final Counter elementsRead = Metrics.counter("io", "elementsRead");
+    private final Counter elementsRead = SourceMetrics.elementsRead();
 
     public BoundedCountingReader(OffsetBasedSource<Long> source) {
       super(source);
@@ -354,7 +354,7 @@ public class CountingSource {
     private Instant currentTimestamp;
     private Instant firstStarted;
 
-    private final Counter elementsRead = Metrics.counter("io", "elementsRead");
+    private final Counter elementsRead = SourceMetrics.elementsRead();
 
     public UnboundedCountingReader(UnboundedCountingSource source, CounterMark 
mark) {
       this.source = source;

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
index 93700e6..7ab5ebc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  * indirection.
  */
 @Experimental(Kind.METRICS)
-public class CounterCell implements MetricCell<Counter, Long>, Counter {
+public class CounterCell implements MetricCell<Long> {
 
   private final DirtyState dirty = new DirtyState();
   private final AtomicLong value = new AtomicLong();
@@ -57,28 +57,11 @@ public class CounterCell implements MetricCell<Counter, 
Long>, Counter {
     return value.get();
   }
 
-  @Override
-  public Counter getInterface() {
-    return this;
-  }
-
-  @Override
   public void inc() {
     add(1);
   }
 
-  @Override
   public void inc(long n) {
     add(n);
   }
-
-  @Override
-  public void dec() {
-    add(-1);
-  }
-
-  @Override
-  public void dec(long n) {
-    add(-n);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
index 7f684a8..0f3f6a4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
@@ -30,11 +30,11 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  * of indirection.
  */
 @Experimental(Kind.METRICS)
-public class DistributionCell implements MetricCell<Distribution, 
DistributionData>, Distribution {
+public class DistributionCell implements MetricCell<DistributionData> {
 
   private final DirtyState dirty = new DirtyState();
   private final AtomicReference<DistributionData> value =
-      new AtomicReference<DistributionData>(DistributionData.EMPTY);
+      new AtomicReference<>(DistributionData.EMPTY);
 
   /**
    * Package-visibility because all {@link DistributionCell DistributionCells} 
should be created by
@@ -43,7 +43,6 @@ public class DistributionCell implements 
MetricCell<Distribution, DistributionDa
   DistributionCell() {}
 
   /** Increment the counter by the given amount. */
-  @Override
   public void update(long n) {
     DistributionData original;
     do {
@@ -61,10 +60,5 @@ public class DistributionCell implements 
MetricCell<Distribution, DistributionDa
   public DistributionData getCumulative() {
     return value.get();
   }
-
-  @Override
-  public Distribution getInterface() {
-    return this;
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
index 35ae822..6f8e880 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
@@ -29,12 +29,11 @@ import org.apache.beam.sdk.annotations.Experimental;
  * of indirection.
  */
 @Experimental(Experimental.Kind.METRICS)
-public class GaugeCell implements MetricCell<Gauge, GaugeData>, Gauge {
+public class GaugeCell implements MetricCell<GaugeData> {
 
   private final DirtyState dirty = new DirtyState();
   private final AtomicReference<GaugeData> gaugeValue = new 
AtomicReference<>(GaugeData.empty());
 
-  @Override
   public void set(long value) {
     GaugeData original;
     do {
@@ -52,9 +51,4 @@ public class GaugeCell implements MetricCell<Gauge, 
GaugeData>, Gauge {
   public GaugeData getCumulative() {
     return gaugeValue.get();
   }
-
-  @Override
-  public Gauge getInterface() {
-    return this;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
index 37a5f65..dcd8a04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
@@ -21,4 +21,10 @@ package org.apache.beam.sdk.metrics;
 /**
  * Marker interface for all user-facing metrics.
  */
-public interface Metric { }
+public interface Metric {
+
+  /**
+   * The {@link MetricName} given to this metric.
+   */
+  MetricName getName();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
index 7cf9710..82e30cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
@@ -24,11 +24,10 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  * A {@link MetricCell} is used for accumulating in-memory changes to a 
metric. It represents a
  * specific metric name in a single context.
  *
- * @param <UserT> The type of the user interface for reporting changes to this 
cell.
  * @param <DataT> The type of metric data stored (and extracted) from this 
cell.
  */
 @Experimental(Kind.METRICS)
-public interface MetricCell<UserT extends Metric, DataT> {
+public interface MetricCell<DataT> {
 
   /**
    * Return the {@link DirtyState} tracking whether this metric cell contains 
uncommitted changes.
@@ -39,9 +38,4 @@ public interface MetricCell<UserT extends Metric, DataT> {
    * Return the cumulative value of this metric.
    */
   DataT getCumulative();
-
-  /**
-   * Return the user-facing mutator for this cell.
-   */
-  UserT getInterface();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 587241a..9286ea9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -131,6 +131,10 @@ public class Metrics {
     @Override public void dec(long n) {
       inc(-1 * n);
     }
+
+    @Override public MetricName getName() {
+      return name;
+    }
   }
 
   /**
@@ -150,6 +154,10 @@ public class Metrics {
         container.getDistribution(name).update(value);
       }
     }
+
+    @Override public MetricName getName() {
+      return name;
+    }
   }
 
   /**
@@ -169,5 +177,9 @@ public class Metrics {
         container.getGauge(name).set(value);
       }
     }
+
+    @Override public MetricName getName() {
+      return name;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index 5812ec6..fbb0da3 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -96,7 +96,7 @@ public class MetricsContainer {
     return gauges.get(metricName);
   }
 
-  private <UpdateT, CellT extends MetricCell<?, UpdateT>>
+  private <UpdateT, CellT extends MetricCell<UpdateT>>
   ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
       MetricsMap<MetricName, CellT> cells) {
     ImmutableList.Builder<MetricUpdate<UpdateT>> updates = 
ImmutableList.builder();
@@ -120,8 +120,8 @@ public class MetricsContainer {
         extractUpdates(gauges));
   }
 
-  private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, 
?>> cells) {
-    for (MetricCell<?, ?> cell : cells.values()) {
+  private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>> 
cells) {
+    for (MetricCell<?> cell : cells.values()) {
       cell.getDirty().afterCommit();
     }
   }
@@ -135,7 +135,7 @@ public class MetricsContainer {
     commitUpdates(distributions);
   }
 
-  private <UpdateT, CellT extends MetricCell<?, UpdateT>>
+  private <UpdateT, CellT extends MetricCell<UpdateT>>
   ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
       MetricsMap<MetricName, CellT> cells) {
     ImmutableList.Builder<MetricUpdate<UpdateT>> updates = 
ImmutableList.builder();

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java
new file mode 100644
index 0000000..f96b6ac
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+/**
+ * Standard Sink Metrics.
+ */
+public class SinkMetrics {
+
+  private static final String SINK_NAMESPACE = "sink";
+
+  private static final String ELEMENTS_WRITTEN = "elements_written";
+  private static final String BYTES_WRITTEN = "bytes_written";
+
+  private static final Counter ELEMENTS_WRITTEN_COUNTER =
+      Metrics.counter(SINK_NAMESPACE, ELEMENTS_WRITTEN);
+  private static final Counter BYTES_WRITTEN_COUNTER =
+      Metrics.counter(SINK_NAMESPACE, BYTES_WRITTEN);
+
+  /**
+   * Counter of elements written to a sink.
+   */
+  public static Counter elementsWritten() {
+    return ELEMENTS_WRITTEN_COUNTER;
+  }
+
+  /**
+   * Counter of bytes written to a sink.
+   */
+  public static Counter bytesWritten() {
+    return BYTES_WRITTEN_COUNTER;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java
new file mode 100644
index 0000000..4479f3a
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Standard {@link org.apache.beam.sdk.io.Source} Metrics.
+ */
+public class SourceMetrics {
+
+  private static final String SOURCE_NAMESPACE = "source";
+  private static final String SOURCE_SPLITS_NAMESPACE = "source.splits";
+  private static final String SEPARATOR = ".";
+
+  private static final String ELEMENTS_READ = "elements_read";
+  private static final String BYTES_READ = "bytes_read";
+  private static final String BACKLOG_BYTES = "backlog_bytes";
+  private static final String BACKLOG_ELEMENTS = "backlog_elements";
+
+  private static final Counter ELEMENTS_READ_COUNTER =
+      Metrics.counter(SOURCE_NAMESPACE, ELEMENTS_READ);
+  private static final Counter BYTES_READ_COUNTER =
+      Metrics.counter(SOURCE_NAMESPACE, BYTES_READ);
+  private static final Gauge BACKLOG_BYTES_GAUGE =
+      Metrics.gauge(SOURCE_NAMESPACE, BACKLOG_BYTES);
+  private static final Gauge BACKLOG_ELEMENTS_GAUGE =
+      Metrics.gauge(SOURCE_NAMESPACE, BACKLOG_ELEMENTS);
+
+  /**
+   * Counter of elements read by a source.
+   */
+  public static Counter elementsRead() {
+    return ELEMENTS_READ_COUNTER;
+  }
+
+  /**
+   * Counter of elements read by a source split.
+   *
+   * <p>Should only be used when there is a small, fixed set of split IDs so 
as not to overload
+   * metrics backends.</p>
+   */
+  public static Counter elementsReadBySplit(String splitId) {
+    return Metrics.counter(SOURCE_SPLITS_NAMESPACE, renderName(splitId, 
ELEMENTS_READ));
+  }
+
+  /**
+   * Counter of bytes read by a source.
+   */
+  public static Counter bytesRead() {
+    return BYTES_READ_COUNTER;
+  }
+
+  /**
+   * Counter of bytes read by a source split.
+   *
+   * <p>Should only be used when there is a small, fixed set of split IDs so 
as not to overload
+   * metrics backends.</p>
+   */
+  public static Counter bytesReadBySplit(String splitId) {
+    return Metrics.counter(SOURCE_SPLITS_NAMESPACE, renderName(splitId, 
BYTES_READ));
+  }
+
+  /**
+   * Gauge for source backlog in bytes.
+   */
+  public static Gauge backlogBytes() {
+    return BACKLOG_BYTES_GAUGE;
+  }
+
+  /**
+   * Gauge for source split backlog in bytes.
+   *
+   * <p>Should only be used when there is a small, fixed set of split IDs so 
as not to overload
+   * metrics backends.</p>
+   */
+  public static Gauge backlogBytesOfSplit(String splitId) {
+    return Metrics.gauge(SOURCE_SPLITS_NAMESPACE, renderName(splitId, 
BACKLOG_BYTES));
+  }
+
+  /**
+   * Gauge for source backlog in elements.
+   */
+  public static Gauge backlogElements() {
+    return BACKLOG_ELEMENTS_GAUGE;
+  }
+
+  /**
+   * Gauge for source split backlog in elements.
+   *
+   * <p>Should only be used when there is a small, fixed set of split IDs so 
as not to overload
+   * metrics backends.</p>
+   */
+  public static Gauge backlogElementsOfSplit(String splitId) {
+    return Metrics.gauge(SOURCE_SPLITS_NAMESPACE, renderName(splitId, 
BACKLOG_ELEMENTS));
+  }
+
+  private static String renderName(String... nameParts) {
+    return Joiner.on(SEPARATOR).join(nameParts);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index a093e89..c7068e1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -56,6 +56,7 @@ public class MetricsTest implements Serializable {
   private static final String NAME = "name";
   private static final MetricName METRIC_NAME = MetricName.named(NS, NAME);
   private static final String NAMESPACE = MetricsTest.class.getName();
+  private static final MetricName ELEMENTS_READ = 
SourceMetrics.elementsRead().getName();
 
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
@@ -245,11 +246,16 @@ public class MetricsTest implements Serializable {
             .metrics()
             .queryMetrics(
                 MetricsFilter.builder()
-                    .addNameFilter(MetricNameFilter.named("io", 
"elementsRead"))
+                    .addNameFilter(
+                        MetricNameFilter.named(ELEMENTS_READ.namespace(), 
ELEMENTS_READ.name()))
                     .build());
 
     assertThat(metrics.counters(), hasItem(
-        attemptedMetricsResult("io", "elementsRead", 
"Read(BoundedCountingSource)", 1000L)));
+        attemptedMetricsResult(
+            ELEMENTS_READ.namespace(),
+            ELEMENTS_READ.name(),
+            "Read(BoundedCountingSource)",
+            1000L)));
   }
 
   @Test
@@ -269,10 +275,15 @@ public class MetricsTest implements Serializable {
             .metrics()
             .queryMetrics(
                 MetricsFilter.builder()
-                    .addNameFilter(MetricNameFilter.named("io", 
"elementsRead"))
+                    .addNameFilter(
+                        MetricNameFilter.named(ELEMENTS_READ.namespace(), 
ELEMENTS_READ.name()))
                     .build());
 
     assertThat(metrics.counters(), hasItem(
-        attemptedMetricsResult("io", "elementsRead", 
"Read(UnboundedCountingSource)", 1000L)));
+        attemptedMetricsResult(
+            ELEMENTS_READ.namespace(),
+            ELEMENTS_READ.name(),
+            "Read(UnboundedCountingSource)",
+            1000L)));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index 2b4cd71..fd5f396 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
@@ -48,7 +48,7 @@ class StreamingWriteFn
   private transient Map<String, List<String>> uniqueIdsForTableRows;
 
   /** Tracks bytes written, exposed as "ByteCount" Counter. */
-  private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, 
"ByteCount");
+  private Counter byteCounter = SinkMetrics.bytesWritten();
 
   StreamingWriteFn(BigQueryServices bqServices) {
     this.bqServices = bqServices;

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index cf43ae6..002e979 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -47,6 +47,7 @@ import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -235,8 +236,8 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
     private transient PubsubClient pubsubClient;
 
     private final Counter batchCounter = Metrics.counter(WriterFn.class, 
"batches");
-    private final Counter elementCounter = Metrics.counter(WriterFn.class, 
"elements");
-    private final Counter byteCounter = Metrics.counter(WriterFn.class, 
"bytes");
+    private final Counter elementCounter = SinkMetrics.elementsWritten();
+    private final Counter byteCounter = SinkMetrics.bytesWritten();
 
     WriterFn(
         PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,

http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 4979939..b16b665 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -59,7 +59,7 @@ import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
 import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Combine;
@@ -1197,7 +1197,7 @@ public class PubsubUnboundedSource<T> extends 
PTransform<PBegin, PCollection<T>>
   // 
================================================================================
 
   private static class StatsFn<T> extends DoFn<T, T> {
-    private final Counter elementCounter = Metrics.counter(StatsFn.class, 
"elements");
+    private final Counter elementCounter = SourceMetrics.elementsRead();
 
     private final PubsubClientFactory pubsubFactory;
     @Nullable

Reply via email to