Repository: beam Updated Branches: refs/heads/master 8d240981b -> 4ffd43ed7
Remove Aggregators from BigQuery and PubSub Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/695936ff Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/695936ff Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/695936ff Branch: refs/heads/master Commit: 695936ffaac03799d4ee972fb99b73202582e7fa Parents: 8d24098 Author: Pablo <[email protected]> Authored: Mon Mar 20 15:39:53 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon Mar 20 17:15:35 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/PubsubUnboundedSink.java | 24 ++++++++------------ .../beam/sdk/io/PubsubUnboundedSource.java | 8 +++---- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 13 ++++------- 3 files changed, 19 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/695936ff/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index c726fd7..f41b5b7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -45,15 +45,15 @@ import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.PubsubIO.PubsubMessage; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.windowing.AfterFirst; @@ -164,8 +164,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { * Convert elements to messages and shard them. */ private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> { - private final Aggregator<Long, Long> elementCounter = - createAggregator("elements", Sum.ofLongs()); + private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements"); private final Coder<T> elementCoder; private final int numShards; private final RecordIdMethod recordIdMethod; @@ -181,7 +180,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { @ProcessElement public void processElement(ProcessContext c) throws Exception { - elementCounter.addValue(1L); + elementCounter.inc(); byte[] elementBytes = null; Map<String, String> attributes = ImmutableMap.<String, String>of(); if (formatFn != null) { @@ -242,12 +241,9 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { @Nullable private transient PubsubClient pubsubClient; - private final Aggregator<Long, Long> batchCounter = - createAggregator("batches", Sum.ofLongs()); - private final Aggregator<Long, Long> elementCounter = - createAggregator("elements", Sum.ofLongs()); - private final Aggregator<Long, Long> byteCounter = - createAggregator("bytes", Sum.ofLongs()); + private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches"); + private final Counter elementCounter = Metrics.counter(WriterFn.class, "elements"); + private final Counter byteCounter = Metrics.counter(WriterFn.class, "bytes"); WriterFn( PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic, @@ -269,9 +265,9 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { int n = pubsubClient.publish(topic.get(), messages); checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful", messages.size(), n); - batchCounter.addValue(1L); - elementCounter.addValue((long) messages.size()); - byteCounter.addValue((long) bytes); + batchCounter.inc(); + elementCounter.inc(messages.size()); + byteCounter.inc(bytes); } @StartBundle http://git-wip-us.apache.org/repos/asf/beam/blob/695936ff/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index 6c8a788..90bcc76 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -51,10 +51,11 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.PubsubIO.PubsubMessage; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -1169,8 +1170,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> // ================================================================================ private static class StatsFn<T> extends DoFn<T, T> { - private final Aggregator<Long, Long> elementCounter = - createAggregator("elements", Sum.ofLongs()); + private final Counter elementCounter = Metrics.counter(StatsFn.class, "elements"); private final PubsubClientFactory pubsubFactory; @Nullable @@ -1198,7 +1198,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> @ProcessElement public void processElement(ProcessContext c) throws Exception { - elementCounter.addValue(1L); + elementCounter.inc(); c.output(c.element()); } http://git-wip-us.apache.org/repos/asf/beam/blob/695936ff/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e9ea0e0..03e18e6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -86,6 +86,8 @@ import org.apache.beam.sdk.io.AvroSource; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -93,7 +95,6 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; @@ -102,7 +103,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -118,7 +118,6 @@ import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.Reshuffle; -import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; @@ -2436,7 +2435,6 @@ public class BigQueryIO { /** * Implementation of DoFn to perform streaming BigQuery write. */ - @SystemDoFnInternal @VisibleForTesting static class StreamingWriteFn extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> { @@ -2460,9 +2458,8 @@ public class BigQueryIO { private static Set<String> createdTables = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); - /** Tracks bytes written, exposed as "ByteCount" Counter. */ - private Aggregator<Long, Long> byteCountAggregator = - createAggregator("ByteCount", Sum.ofLongs()); + /** Tracks bytes written, exposed as "ByteCount" Metrics Counter. */ + private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, "ByteCount"); /** Constructor. */ StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema, @@ -2564,7 +2561,7 @@ public class BigQueryIO { try { long totalBytes = bqServices.getDatasetService(options).insertAll( tableReference, tableRows, uniqueIds); - byteCountAggregator.addValue(totalBytes); + byteCounter.inc(totalBytes); } catch (IOException e) { throw new RuntimeException(e); }
