Repository: beam Updated Branches: refs/heads/master f3cff3695 -> 30dbaf891
[BEAM-1958] Standard IO Metrics in Java SDK Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/41d52be0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/41d52be0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/41d52be0 Branch: refs/heads/master Commit: 41d52be0ec64c83a79d97bfd3c27eb104b546991 Parents: f3cff36 Author: Aviem Zur <aviem...@gmail.com> Authored: Thu Apr 13 20:27:33 2017 +0300 Committer: Aviem Zur <aviem...@gmail.com> Committed: Wed Apr 26 19:53:29 2017 +0300 ---------------------------------------------------------------------- .../streaming/StreamingSourceMetricsTest.java | 12 +- .../org/apache/beam/sdk/io/CountingSource.java | 6 +- .../apache/beam/sdk/metrics/CounterCell.java | 19 +-- .../beam/sdk/metrics/DistributionCell.java | 10 +- .../org/apache/beam/sdk/metrics/GaugeCell.java | 8 +- .../org/apache/beam/sdk/metrics/Metric.java | 8 +- .../org/apache/beam/sdk/metrics/MetricCell.java | 8 +- .../org/apache/beam/sdk/metrics/Metrics.java | 12 ++ .../beam/sdk/metrics/MetricsContainer.java | 8 +- .../apache/beam/sdk/metrics/SinkMetrics.java | 49 ++++++++ .../apache/beam/sdk/metrics/SourceMetrics.java | 116 +++++++++++++++++++ .../apache/beam/sdk/metrics/MetricsTest.java | 19 ++- .../sdk/io/gcp/bigquery/StreamingWriteFn.java | 4 +- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 5 +- .../io/gcp/pubsub/PubsubUnboundedSource.java | 4 +- 15 files changed, 228 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java index 80f7f53..5a4b1b5 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java @@ -29,9 +29,11 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.metrics.SourceMetrics; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; @@ -41,6 +43,7 @@ import org.junit.Test; * Verify metrics support for {@link Source Sources} in streaming pipelines. */ public class StreamingSourceMetricsTest implements Serializable { + private static final MetricName ELEMENTS_READ = SourceMetrics.elementsRead().getName(); // Force streaming pipeline using pipeline rule. @Rule @@ -65,10 +68,15 @@ public class StreamingSourceMetricsTest implements Serializable { .metrics() .queryMetrics( MetricsFilter.builder() - .addNameFilter(MetricNameFilter.named("io", "elementsRead")) + .addNameFilter( + MetricNameFilter.named(ELEMENTS_READ.namespace(), ELEMENTS_READ.name())) .build()); assertThat(metrics.counters(), hasItem( - attemptedMetricsResult("io", "elementsRead", "Read(UnboundedCountingSource)", 1000L))); + attemptedMetricsResult( + ELEMENTS_READ.namespace(), + ELEMENTS_READ.name(), + "Read(UnboundedCountingSource)", + 1000L))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java index b66a8b2..81082e5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; @@ -190,7 +190,7 @@ public class CountingSource { private static class BoundedCountingReader extends OffsetBasedSource.OffsetBasedReader<Long> { private long current; - private final Counter elementsRead = Metrics.counter("io", "elementsRead"); + private final Counter elementsRead = SourceMetrics.elementsRead(); public BoundedCountingReader(OffsetBasedSource<Long> source) { super(source); @@ -354,7 +354,7 @@ public class CountingSource { private Instant currentTimestamp; private Instant firstStarted; - private final Counter elementsRead = Metrics.counter("io", "elementsRead"); + private final Counter elementsRead = SourceMetrics.elementsRead(); public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) { this.source = source; http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java index 93700e6..7ab5ebc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; * indirection. */ @Experimental(Kind.METRICS) -public class CounterCell implements MetricCell<Counter, Long>, Counter { +public class CounterCell implements MetricCell<Long> { private final DirtyState dirty = new DirtyState(); private final AtomicLong value = new AtomicLong(); @@ -57,28 +57,11 @@ public class CounterCell implements MetricCell<Counter, Long>, Counter { return value.get(); } - @Override - public Counter getInterface() { - return this; - } - - @Override public void inc() { add(1); } - @Override public void inc(long n) { add(n); } - - @Override - public void dec() { - add(-1); - } - - @Override - public void dec(long n) { - add(-n); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java index 7f684a8..0f3f6a4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java @@ -30,11 +30,11 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; * of indirection. */ @Experimental(Kind.METRICS) -public class DistributionCell implements MetricCell<Distribution, DistributionData>, Distribution { +public class DistributionCell implements MetricCell<DistributionData> { private final DirtyState dirty = new DirtyState(); private final AtomicReference<DistributionData> value = - new AtomicReference<DistributionData>(DistributionData.EMPTY); + new AtomicReference<>(DistributionData.EMPTY); /** * Package-visibility because all {@link DistributionCell DistributionCells} should be created by @@ -43,7 +43,6 @@ public class DistributionCell implements MetricCell<Distribution, DistributionDa DistributionCell() {} /** Increment the counter by the given amount. */ - @Override public void update(long n) { DistributionData original; do { @@ -61,10 +60,5 @@ public class DistributionCell implements MetricCell<Distribution, DistributionDa public DistributionData getCumulative() { return value.get(); } - - @Override - public Distribution getInterface() { - return this; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java index 35ae822..6f8e880 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java @@ -29,12 +29,11 @@ import org.apache.beam.sdk.annotations.Experimental; * of indirection. */ @Experimental(Experimental.Kind.METRICS) -public class GaugeCell implements MetricCell<Gauge, GaugeData>, Gauge { +public class GaugeCell implements MetricCell<GaugeData> { private final DirtyState dirty = new DirtyState(); private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty()); - @Override public void set(long value) { GaugeData original; do { @@ -52,9 +51,4 @@ public class GaugeCell implements MetricCell<Gauge, GaugeData>, Gauge { public GaugeData getCumulative() { return gaugeValue.get(); } - - @Override - public Gauge getInterface() { - return this; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java index 37a5f65..dcd8a04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java @@ -21,4 +21,10 @@ package org.apache.beam.sdk.metrics; /** * Marker interface for all user-facing metrics. */ -public interface Metric { } +public interface Metric { + + /** + * The {@link MetricName} given to this metric. + */ + MetricName getName(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java index 7cf9710..82e30cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java @@ -24,11 +24,10 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; * A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a * specific metric name in a single context. * - * @param <UserT> The type of the user interface for reporting changes to this cell. * @param <DataT> The type of metric data stored (and extracted) from this cell. */ @Experimental(Kind.METRICS) -public interface MetricCell<UserT extends Metric, DataT> { +public interface MetricCell<DataT> { /** * Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes. @@ -39,9 +38,4 @@ public interface MetricCell<UserT extends Metric, DataT> { * Return the cumulative value of this metric. */ DataT getCumulative(); - - /** - * Return the user-facing mutator for this cell. - */ - UserT getInterface(); } http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java index 587241a..9286ea9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -131,6 +131,10 @@ public class Metrics { @Override public void dec(long n) { inc(-1 * n); } + + @Override public MetricName getName() { + return name; + } } /** @@ -150,6 +154,10 @@ public class Metrics { container.getDistribution(name).update(value); } } + + @Override public MetricName getName() { + return name; + } } /** @@ -169,5 +177,9 @@ public class Metrics { container.getGauge(name).set(value); } } + + @Override public MetricName getName() { + return name; + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java index 5812ec6..fbb0da3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -96,7 +96,7 @@ public class MetricsContainer { return gauges.get(metricName); } - private <UpdateT, CellT extends MetricCell<?, UpdateT>> + private <UpdateT, CellT extends MetricCell<UpdateT>> ImmutableList<MetricUpdate<UpdateT>> extractUpdates( MetricsMap<MetricName, CellT> cells) { ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder(); @@ -120,8 +120,8 @@ public class MetricsContainer { extractUpdates(gauges)); } - private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) { - for (MetricCell<?, ?> cell : cells.values()) { + private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>> cells) { + for (MetricCell<?> cell : cells.values()) { cell.getDirty().afterCommit(); } } @@ -135,7 +135,7 @@ public class MetricsContainer { commitUpdates(distributions); } - private <UpdateT, CellT extends MetricCell<?, UpdateT>> + private <UpdateT, CellT extends MetricCell<UpdateT>> ImmutableList<MetricUpdate<UpdateT>> extractCumulatives( MetricsMap<MetricName, CellT> cells) { ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder(); http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java new file mode 100644 index 0000000..f96b6ac --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +/** + * Standard Sink Metrics. + */ +public class SinkMetrics { + + private static final String SINK_NAMESPACE = "sink"; + + private static final String ELEMENTS_WRITTEN = "elements_written"; + private static final String BYTES_WRITTEN = "bytes_written"; + + private static final Counter ELEMENTS_WRITTEN_COUNTER = + Metrics.counter(SINK_NAMESPACE, ELEMENTS_WRITTEN); + private static final Counter BYTES_WRITTEN_COUNTER = + Metrics.counter(SINK_NAMESPACE, BYTES_WRITTEN); + + /** + * Counter of elements written to a sink. + */ + public static Counter elementsWritten() { + return ELEMENTS_WRITTEN_COUNTER; + } + + /** + * Counter of bytes written to a sink. + */ + public static Counter bytesWritten() { + return BYTES_WRITTEN_COUNTER; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java new file mode 100644 index 0000000..4479f3a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.common.base.Joiner; + +/** + * Standard {@link org.apache.beam.sdk.io.Source} Metrics. + */ +public class SourceMetrics { + + private static final String SOURCE_NAMESPACE = "source"; + private static final String SOURCE_SPLITS_NAMESPACE = "source.splits"; + private static final String SEPARATOR = "."; + + private static final String ELEMENTS_READ = "elements_read"; + private static final String BYTES_READ = "bytes_read"; + private static final String BACKLOG_BYTES = "backlog_bytes"; + private static final String BACKLOG_ELEMENTS = "backlog_elements"; + + private static final Counter ELEMENTS_READ_COUNTER = + Metrics.counter(SOURCE_NAMESPACE, ELEMENTS_READ); + private static final Counter BYTES_READ_COUNTER = + Metrics.counter(SOURCE_NAMESPACE, BYTES_READ); + private static final Gauge BACKLOG_BYTES_GAUGE = + Metrics.gauge(SOURCE_NAMESPACE, BACKLOG_BYTES); + private static final Gauge BACKLOG_ELEMENTS_GAUGE = + Metrics.gauge(SOURCE_NAMESPACE, BACKLOG_ELEMENTS); + + /** + * Counter of elements read by a source. + */ + public static Counter elementsRead() { + return ELEMENTS_READ_COUNTER; + } + + /** + * Counter of elements read by a source split. + * + * <p>Should only be used when there is a small, fixed set of split IDs so as not to overload + * metrics backends.</p> + */ + public static Counter elementsReadBySplit(String splitId) { + return Metrics.counter(SOURCE_SPLITS_NAMESPACE, renderName(splitId, ELEMENTS_READ)); + } + + /** + * Counter of bytes read by a source. + */ + public static Counter bytesRead() { + return BYTES_READ_COUNTER; + } + + /** + * Counter of bytes read by a source split. + * + * <p>Should only be used when there is a small, fixed set of split IDs so as not to overload + * metrics backends.</p> + */ + public static Counter bytesReadBySplit(String splitId) { + return Metrics.counter(SOURCE_SPLITS_NAMESPACE, renderName(splitId, BYTES_READ)); + } + + /** + * Gauge for source backlog in bytes. + */ + public static Gauge backlogBytes() { + return BACKLOG_BYTES_GAUGE; + } + + /** + * Gauge for source split backlog in bytes. + * + * <p>Should only be used when there is a small, fixed set of split IDs so as not to overload + * metrics backends.</p> + */ + public static Gauge backlogBytesOfSplit(String splitId) { + return Metrics.gauge(SOURCE_SPLITS_NAMESPACE, renderName(splitId, BACKLOG_BYTES)); + } + + /** + * Gauge for source backlog in elements. + */ + public static Gauge backlogElements() { + return BACKLOG_ELEMENTS_GAUGE; + } + + /** + * Gauge for source split backlog in elements. + * + * <p>Should only be used when there is a small, fixed set of split IDs so as not to overload + * metrics backends.</p> + */ + public static Gauge backlogElementsOfSplit(String splitId) { + return Metrics.gauge(SOURCE_SPLITS_NAMESPACE, renderName(splitId, BACKLOG_ELEMENTS)); + } + + private static String renderName(String... nameParts) { + return Joiner.on(SEPARATOR).join(nameParts); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index a093e89..c7068e1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -56,6 +56,7 @@ public class MetricsTest implements Serializable { private static final String NAME = "name"; private static final MetricName METRIC_NAME = MetricName.named(NS, NAME); private static final String NAMESPACE = MetricsTest.class.getName(); + private static final MetricName ELEMENTS_READ = SourceMetrics.elementsRead().getName(); @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -245,11 +246,16 @@ public class MetricsTest implements Serializable { .metrics() .queryMetrics( MetricsFilter.builder() - .addNameFilter(MetricNameFilter.named("io", "elementsRead")) + .addNameFilter( + MetricNameFilter.named(ELEMENTS_READ.namespace(), ELEMENTS_READ.name())) .build()); assertThat(metrics.counters(), hasItem( - attemptedMetricsResult("io", "elementsRead", "Read(BoundedCountingSource)", 1000L))); + attemptedMetricsResult( + ELEMENTS_READ.namespace(), + ELEMENTS_READ.name(), + "Read(BoundedCountingSource)", + 1000L))); } @Test @@ -269,10 +275,15 @@ public class MetricsTest implements Serializable { .metrics() .queryMetrics( MetricsFilter.builder() - .addNameFilter(MetricNameFilter.named("io", "elementsRead")) + .addNameFilter( + MetricNameFilter.named(ELEMENTS_READ.namespace(), ELEMENTS_READ.name())) .build()); assertThat(metrics.counters(), hasItem( - attemptedMetricsResult("io", "elementsRead", "Read(UnboundedCountingSource)", 1000L))); + attemptedMetricsResult( + ELEMENTS_READ.namespace(), + ELEMENTS_READ.name(), + "Read(UnboundedCountingSource)", + 1000L))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java index 2b4cd71..fd5f396 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java @@ -26,7 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.SystemDoFnInternal; @@ -48,7 +48,7 @@ class StreamingWriteFn private transient Map<String, List<String>> uniqueIdsForTableRows; /** Tracks bytes written, exposed as "ByteCount" Counter. */ - private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, "ByteCount"); + private Counter byteCounter = SinkMetrics.bytesWritten(); StreamingWriteFn(BigQueryServices bqServices) { this.bqServices = bqServices; http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index cf43ae6..002e979 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; @@ -235,8 +236,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { private transient PubsubClient pubsubClient; private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches"); - private final Counter elementCounter = Metrics.counter(WriterFn.class, "elements"); - private final Counter byteCounter = Metrics.counter(WriterFn.class, "bytes"); + private final Counter elementCounter = SinkMetrics.elementsWritten(); + private final Counter byteCounter = SinkMetrics.bytesWritten(); WriterFn( PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic, http://git-wip-us.apache.org/repos/asf/beam/blob/41d52be0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 4979939..b16b665 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -59,7 +59,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Combine; @@ -1197,7 +1197,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> // ================================================================================ private static class StatsFn<T> extends DoFn<T, T> { - private final Counter elementCounter = Metrics.counter(StatsFn.class, "elements"); + private final Counter elementCounter = SourceMetrics.elementsRead(); private final PubsubClientFactory pubsubFactory; @Nullable