Remove Accumulators and switch to the Metrics API Fix compile after sideOutput and split refactor
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b438fa7d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b438fa7d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b438fa7d Branch: refs/heads/master Commit: b438fa7df16e5181f73b6103ac2f57430cd9e6f3 Parents: e10d578 Author: Ismaël MejÃa <[email protected]> Authored: Wed Apr 19 11:22:42 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:27 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/pom.xml | 6 +- .../beam/integration/nexmark/Monitor.java | 77 ++-- .../beam/integration/nexmark/NexmarkQuery.java | 16 +- .../beam/integration/nexmark/NexmarkRunner.java | 129 +++++-- .../beam/integration/nexmark/NexmarkUtils.java | 107 +++--- .../beam/integration/nexmark/WinningBids.java | 102 +++--- .../nexmark/drivers/NexmarkGoogleRunner.java | 4 +- .../integration/nexmark/queries/Query0.java | 10 +- .../integration/nexmark/queries/Query10.java | 363 +++++++++---------- .../integration/nexmark/queries/Query3.java | 73 ++-- .../nexmark/sources/BoundedEventSource.java | 2 +- .../nexmark/sources/UnboundedEventSource.java | 2 +- .../nexmark/sources/BoundedEventSourceTest.java | 2 +- 13 files changed, 448 insertions(+), 445 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml index 67d6117..103c18f 100644 --- a/integration/java/nexmark/pom.xml +++ b/integration/java/nexmark/pom.xml @@ -29,7 +29,6 @@ <artifactId>beam-integration-java-nexmark</artifactId> <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name> - <packaging>jar</packaging> <properties> @@ -227,6 +226,11 @@ <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + </dependency> + <!-- Extra libraries --> <dependency> <groupId>com.google.apis</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java index 6370e41..cb4d71c 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java @@ -20,54 +20,55 @@ package org.apache.beam.integration.nexmark; import java.io.Serializable; import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.PCollection; /** - * A monitor of elements with support for later retrieving their aggregators. + * A monitor of elements with support for later retrieving their metrics. * * @param <T> Type of element we are monitoring. */ public class Monitor<T extends KnownSize> implements Serializable { private class MonitorDoFn extends DoFn<T, T> { - public final Aggregator<Long, Long> elementCounter = - createAggregator(counterNamePrefix + "_elements", Sum.ofLongs()); - public final Aggregator<Long, Long> bytesCounter = - createAggregator(counterNamePrefix + "_bytes", Sum.ofLongs()); - public final Aggregator<Long, Long> startTime = - createAggregator(counterNamePrefix + "_startTime", Min.ofLongs()); - public final Aggregator<Long, Long> endTime = - createAggregator(counterNamePrefix + "_endTime", Max.ofLongs()); - public final Aggregator<Long, Long> startTimestamp = - createAggregator("startTimestamp", Min.ofLongs()); - public final Aggregator<Long, Long> endTimestamp = - createAggregator("endTimestamp", Max.ofLongs()); + final Counter elementCounter = + Metrics.counter(name , prefix + ".elements"); + final Counter bytesCounter = + Metrics.counter(name , prefix + ".bytes"); + final Distribution startTime = + Metrics.distribution(name , prefix + ".startTime"); + final Distribution endTime = + Metrics.distribution(name , prefix + ".endTime"); + final Distribution startTimestamp = + Metrics.distribution(name , prefix + ".startTimestamp"); + final Distribution endTimestamp = + Metrics.distribution(name , prefix + ".endTimestamp"); @ProcessElement public void processElement(ProcessContext c) { - elementCounter.addValue(1L); - bytesCounter.addValue(c.element().sizeInBytes()); + elementCounter.inc(); + bytesCounter.inc(c.element().sizeInBytes()); long now = System.currentTimeMillis(); - startTime.addValue(now); - endTime.addValue(now); - startTimestamp.addValue(c.timestamp().getMillis()); - endTimestamp.addValue(c.timestamp().getMillis()); + startTime.update(now); + endTime.update(now); + startTimestamp.update(c.timestamp().getMillis()); + endTimestamp.update(c.timestamp().getMillis()); c.output(c.element()); } } + public final String name; + public final String prefix; final MonitorDoFn doFn; final PTransform<PCollection<? extends T>, PCollection<T>> transform; - private String counterNamePrefix; - public Monitor(String name, String counterNamePrefix) { - this.counterNamePrefix = counterNamePrefix; + public Monitor(String name, String prefix) { + this.name = name; + this.prefix = prefix; doFn = new MonitorDoFn(); transform = ParDo.of(doFn); } @@ -75,28 +76,4 @@ public class Monitor<T extends KnownSize> implements Serializable { public PTransform<PCollection<? extends T>, PCollection<T>> getTransform() { return transform; } - - public Aggregator<Long, Long> getElementCounter() { - return doFn.elementCounter; - } - - public Aggregator<Long, Long> getBytesCounter() { - return doFn.bytesCounter; - } - - public Aggregator<Long, Long> getStartTime() { - return doFn.startTime; - } - - public Aggregator<Long, Long> getEndTime() { - return doFn.endTime; - } - - public Aggregator<Long, Long> getStartTimestamp() { - return doFn.startTimestamp; - } - - public Aggregator<Long, Long> getEndTimestamp() { - return doFn.endTimestamp; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java index e1cd493..ab1c305 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java @@ -17,13 +17,13 @@ */ package org.apache.beam.integration.nexmark; -import javax.annotation.Nullable; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.Bid; import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.PTransform; @@ -206,6 +206,7 @@ public abstract class NexmarkQuery public final Monitor<Event> eventMonitor; public final Monitor<KnownSize> resultMonitor; public final Monitor<Event> endOfStreamMonitor; + protected final Counter fatalCounter; protected NexmarkQuery(NexmarkConfiguration configuration, String name) { super(name); @@ -214,23 +215,16 @@ public abstract class NexmarkQuery eventMonitor = new Monitor<>(name + ".Events", "event"); resultMonitor = new Monitor<>(name + ".Results", "result"); endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end"); + fatalCounter = Metrics.counter(name , "fatal"); } else { eventMonitor = null; resultMonitor = null; endOfStreamMonitor = null; + fatalCounter = null; } } /** - * Return the aggregator which counts fatal errors in this query. Return null if no such - * aggregator. - */ - @Nullable - public Aggregator<Long, Long> getFatalCount() { - return null; - } - - /** * Implement the actual query. All we know about the result is it has a known encoded size. */ protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events); http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index ef5f0e2..87314ce 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -24,14 +24,13 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.integration.nexmark.io.PubsubHelper; @@ -63,15 +62,18 @@ import org.apache.beam.integration.nexmark.queries.Query8; import org.apache.beam.integration.nexmark.queries.Query8Model; import org.apache.beam.integration.nexmark.queries.Query9; import org.apache.beam.integration.nexmark.queries.Query9Model; -import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; @@ -186,38 +188,59 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> { protected abstract int maxNumWorkers(); /** - * Return the current value for a long counter, or -1 if can't be retrieved. + * Return the current value for a long counter, or a default value if can't be retrieved. + * Note this uses only attempted metrics because some runners don't support committed metrics. */ - protected long getLong(PipelineResult job, Aggregator<Long, Long> aggregator) { + protected long getCounterMetric(PipelineResult result, String namespace, String name, + long defaultValue) { + //TODO Ismael calc this only once + MetricQueryResults metrics = result.metrics().queryMetrics( + MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); + Iterable<MetricResult<Long>> counters = metrics.counters(); try { - Collection<Long> values = job.getAggregatorValues(aggregator).getValues(); - if (values.size() != 1) { - return -1; - } - return Iterables.getOnlyElement(values); - } catch (AggregatorRetrievalException e) { - return -1; + MetricResult<Long> metricResult = counters.iterator().next(); + return metricResult.attempted(); + } catch (NoSuchElementException e) { + //TODO Ismael } + return defaultValue; } /** - * Return the current value for a time counter, or -1 if can't be retrieved. + * Return the current value for a long counter, or a default value if can't be retrieved. + * Note this uses only attempted metrics because some runners don't support committed metrics. */ - protected long getTimestamp( - long now, PipelineResult job, Aggregator<Long, Long> aggregator) { + protected long getDistributionMetric(PipelineResult result, String namespace, String name, + DistributionType distType, long defaultValue) { + MetricQueryResults metrics = result.metrics().queryMetrics( + MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); + Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions(); try { - Collection<Long> values = job.getAggregatorValues(aggregator).getValues(); - if (values.size() != 1) { - return -1; - } - long value = Iterables.getOnlyElement(values); - if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) { - return -1; + MetricResult<DistributionResult> distributionResult = distributions.iterator().next(); + if (distType.equals(DistributionType.MIN)) { + return distributionResult.attempted().min(); + } else if (distType.equals(DistributionType.MAX)) { + return distributionResult.attempted().max(); + } else { + //TODO Ismael } - return value; - } catch (AggregatorRetrievalException e) { + } catch (NoSuchElementException e) { + //TODO Ismael + } + return defaultValue; + } + + private enum DistributionType {MIN, MAX} + + /** + * Return the current value for a time counter, or -1 if can't be retrieved. + */ + protected long getTimestampMetric(long now, long value) { + //TODO Ismael improve doc + if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) { return -1; } + return value; } /** @@ -294,21 +317,46 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> { * Return the current performance given {@code eventMonitor} and {@code resultMonitor}. */ private NexmarkPerf currentPerf( - long startMsSinceEpoch, long now, PipelineResult job, + long startMsSinceEpoch, long now, PipelineResult result, List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor, Monitor<?> resultMonitor) { NexmarkPerf perf = new NexmarkPerf(); - long numEvents = getLong(job, eventMonitor.getElementCounter()); - long numEventBytes = getLong(job, eventMonitor.getBytesCounter()); - long eventStart = getTimestamp(now, job, eventMonitor.getStartTime()); - long eventEnd = getTimestamp(now, job, eventMonitor.getEndTime()); - long numResults = getLong(job, resultMonitor.getElementCounter()); - long numResultBytes = getLong(job, resultMonitor.getBytesCounter()); - long resultStart = getTimestamp(now, job, resultMonitor.getStartTime()); - long resultEnd = getTimestamp(now, job, resultMonitor.getEndTime()); - long timestampStart = getTimestamp(now, job, resultMonitor.getStartTimestamp()); - long timestampEnd = getTimestamp(now, job, resultMonitor.getEndTimestamp()); + long numEvents = + getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1); + long numEventBytes = + getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1); + long eventStart = + getTimestampMetric(now, + getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime", + DistributionType.MIN, -1)); + long eventEnd = + getTimestampMetric(now, + getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime", + DistributionType.MAX, -1)); + + long numResults = + getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1); + long numResultBytes = + getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1); + long resultStart = + getTimestampMetric(now, + getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime", + DistributionType.MIN, -1)); + long resultEnd = + getTimestampMetric(now, + getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime", + DistributionType.MAX, -1)); + long timestampStart = + getTimestampMetric(now, + getDistributionMetric(result, + resultMonitor.name, resultMonitor.prefix + ".startTimestamp", + DistributionType.MIN, -1)); + long timestampEnd = + getTimestampMetric(now, + getDistributionMetric(result, + resultMonitor.name, resultMonitor.prefix + ".endTimestamp", + DistributionType.MAX, -1)); long effectiveEnd = -1; if (eventEnd >= 0 && resultEnd >= 0) { @@ -372,7 +420,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> { perf.shutdownDelaySec = (now - resultEnd) / 1000.0; } - perf.jobId = getJobId(job); + perf.jobId = getJobId(result); // As soon as available, try to capture cumulative cost at this point too. NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot(); @@ -574,9 +622,10 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> { if (options.isStreaming() && !waitingForShutdown) { Duration quietFor = new Duration(lastActivityMsSinceEpoch, now); - if (query.getFatalCount() != null && getLong(job, query.getFatalCount()) > 0) { + long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0); + if (fatalCount > 0) { NexmarkUtils.console("job has fatal errors, cancelling."); - errors.add(String.format("Pipeline reported %s fatal errors", query.getFatalCount())); + errors.add(String.format("Pipeline reported %s fatal errors", fatalCount)); waitingForShutdown = true; } else if (configuration.debug && configuration.numEvents > 0 && currPerf.numEvents == configuration.numEvents @@ -1033,7 +1082,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> { if (c.element().hashCode() % 2 == 0) { c.output(c.element()); } else { - c.sideOutput(SIDE, c.element()); + c.output(SIDE, c.element()); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java index a47ebcc..18589c4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java @@ -53,12 +53,12 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -419,48 +419,42 @@ public class NexmarkUtils { */ public static ParDo.SingleOutput<Event, Event> snoop(final String name) { return ParDo.of(new DoFn<Event, Event>() { - final Aggregator<Long, Long> eventCounter = - createAggregator("events", Sum.ofLongs()); - final Aggregator<Long, Long> newPersonCounter = - createAggregator("newPersons", Sum.ofLongs()); - final Aggregator<Long, Long> newAuctionCounter = - createAggregator("newAuctions", Sum.ofLongs()); - final Aggregator<Long, Long> bidCounter = - createAggregator("bids", Sum.ofLongs()); - final Aggregator<Long, Long> endOfStreamCounter = - createAggregator("endOfStream", Sum.ofLongs()); - - @ProcessElement - public void processElement(ProcessContext c) { - eventCounter.addValue(1L); - if (c.element().newPerson != null) { - newPersonCounter.addValue(1L); - } else if (c.element().newAuction != null) { - newAuctionCounter.addValue(1L); - } else if (c.element().bid != null) { - bidCounter.addValue(1L); - } else { - endOfStreamCounter.addValue(1L); - } - info("%s snooping element %s", name, c.element()); - c.output(c.element()); - } - }); + final Counter eventCounter = Metrics.counter(name, "events"); + final Counter newPersonCounter = Metrics.counter(name, "newPersons"); + final Counter newAuctionCounter = Metrics.counter(name, "newAuctions"); + final Counter bidCounter = Metrics.counter(name, "bids"); + final Counter endOfStreamCounter = Metrics.counter(name, "endOfStream"); + + @ProcessElement + public void processElement(ProcessContext c) { + eventCounter.inc(); + if (c.element().newPerson != null) { + newPersonCounter.inc(); + } else if (c.element().newAuction != null) { + newAuctionCounter.inc(); + } else if (c.element().bid != null) { + bidCounter.inc(); + } else { + endOfStreamCounter.inc(); + } + info("%s snooping element %s", name, c.element()); + c.output(c.element()); + } + }); } /** * Return a transform to count and discard each element. */ - public static <T> ParDo.SingleOutput<T, Void> devNull(String name) { + public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) { return ParDo.of(new DoFn<T, Void>() { - final Aggregator<Long, Long> discardCounter = - createAggregator("discarded", Sum.ofLongs()); + final Counter discardedCounterMetric = Metrics.counter(name, "discarded"); - @ProcessElement - public void processElement(ProcessContext c) { - discardCounter.addValue(1L); - } - }); + @ProcessElement + public void processElement(ProcessContext c) { + discardedCounterMetric.inc(); + } + }); } /** @@ -468,28 +462,27 @@ public class NexmarkUtils { */ public static <T> ParDo.SingleOutput<T, T> log(final String name) { return ParDo.of(new DoFn<T, T>() { - @ProcessElement - public void processElement(ProcessContext c) { - LOG.info("%s: %s", name, c.element()); - c.output(c.element()); - } - }); + @ProcessElement + public void processElement(ProcessContext c) { + LOG.info("%s: %s", name, c.element()); + c.output(c.element()); + } + }); } /** * Return a transform to format each element as a string. */ - public static <T> ParDo.SingleOutput<T, String> format(String name) { + public static <T> ParDo.SingleOutput<T, String> format(final String name) { return ParDo.of(new DoFn<T, String>() { - final Aggregator<Long, Long> recordCounter = - createAggregator("records", Sum.ofLongs()); + final Counter recordCounterMetric = Metrics.counter(name, "records"); - @ProcessElement - public void processElement(ProcessContext c) { - recordCounter.addValue(1L); - c.output(c.element().toString()); - } - }); + @ProcessElement + public void processElement(ProcessContext c) { + recordCounterMetric.inc(); + c.output(c.element().toString()); + } + }); } /** @@ -497,11 +490,11 @@ public class NexmarkUtils { */ public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) { return ParDo.of(new DoFn<T, TimestampedValue<T>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(TimestampedValue.of(c.element(), c.timestamp())); - } - }); + @ProcessElement + public void processElement(ProcessContext c) { + c.output(TimestampedValue.of(c.element(), c.timestamp())); + } + }); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java index 9f1ddf8..f2566b8 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java @@ -40,11 +40,11 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; @@ -323,56 +323,52 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct // Find the highest price valid bid for each closed auction. return - // Join auctions and bids. - KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById) - .and(NexmarkQuery.BID_TAG, bidsByAuctionId) - .apply(CoGroupByKey.<Long>create()) - - // Filter and select. - .apply(name + ".Join", - ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() { - final Aggregator<Long, Long> noAuctionCounter = - createAggregator("noAuction", Sum.ofLongs()); - final Aggregator<Long, Long> underReserveCounter = - createAggregator("underReserve", Sum.ofLongs()); - final Aggregator<Long, Long> noValidBidsCounter = - createAggregator("noValidBids", Sum.ofLongs()); - - - @ProcessElement - public void processElement(ProcessContext c) { - Auction auction = - c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null); - if (auction == null) { - // We have bids without a matching auction. Give up. - noAuctionCounter.addValue(1L); - return; - } - // Find the current winning bid for auction. - // The earliest bid with the maximum price above the reserve wins. - Bid bestBid = null; - for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) { - // Bids too late for their auction will have been - // filtered out by the window merge function. - checkState(bid.dateTime < auction.expires); - if (bid.price < auction.reserve) { - // Bid price is below auction reserve. - underReserveCounter.addValue(1L); - continue; - } - - if (bestBid == null - || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) { - bestBid = bid; - } - } - if (bestBid == null) { - // We don't have any valid bids for auction. - noValidBidsCounter.addValue(1L); - return; - } - c.output(new AuctionBid(auction, bestBid)); - } - })); + // Join auctions and bids. + KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById) + .and(NexmarkQuery.BID_TAG, bidsByAuctionId) + .apply(CoGroupByKey.<Long>create()) + // Filter and select. + .apply(name + ".Join", + ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() { + private final Counter noAuctionCounter = Metrics.counter(name, "noAuction"); + private final Counter underReserveCounter = Metrics.counter(name, "underReserve"); + private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids"); + + @ProcessElement + public void processElement(ProcessContext c) { + Auction auction = + c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null); + if (auction == null) { + // We have bids without a matching auction. Give up. + noAuctionCounter.inc(); + return; + } + // Find the current winning bid for auction. + // The earliest bid with the maximum price above the reserve wins. + Bid bestBid = null; + for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) { + // Bids too late for their auction will have been + // filtered out by the window merge function. + checkState(bid.dateTime < auction.expires); + if (bid.price < auction.reserve) { + // Bid price is below auction reserve. + underReserveCounter.inc(); + continue; + } + + if (bestBid == null + || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) { + bestBid = bid; + } + } + if (bestBid == null) { + // We don't have any valid bids for auction. + noValidBidsCounter.inc(); + return; + } + c.output(new AuctionBid(auction, bestBid)); + } + } + )); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java index 7ffd47a..935bf0d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java @@ -130,7 +130,9 @@ class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogl NexmarkUtils.console("%s publisher (%d events)", state, numEvents); return; case RUNNING: - numEvents = getLong(job, publisherMonitor.getElementCounter()); + //TODO Ismael Validate that this counter is ok + numEvents = + getCounterMetric(job, publisherMonitor.name, publisherMonitor.prefix + ".elements", -1); if (startMsSinceEpoch < 0 && numEvents > 0) { startMsSinceEpoch = System.currentTimeMillis(); endMsSinceEpoch = startMsSinceEpoch http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java index f60d5de..84696c4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java @@ -28,10 +28,10 @@ import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.PCollection; /** @@ -49,15 +49,15 @@ public class Query0 extends NexmarkQuery { // Force round trip through coder. .apply(name + ".Serialize", ParDo.of(new DoFn<Event, Event>() { - private final Aggregator<Long, Long> bytes = - createAggregator("bytes", Sum.ofLongs()); + private final Counter bytesMetric = + Metrics.counter(name , "bytes"); @ProcessElement public void processElement(ProcessContext c) throws CoderException, IOException { ByteArrayOutputStream outStream = new ByteArrayOutputStream(); coder.encode(c.element(), outStream, Coder.Context.OUTER); byte[] byteArray = outStream.toByteArray(); - bytes.addValue((long) byteArray.length); + bytesMetric.inc((long) byteArray.length); ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray); Event event = coder.decode(inStream, Coder.Context.OUTER); c.output(event); http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java index 5246427..d9b3557 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java @@ -34,12 +34,12 @@ import org.apache.beam.integration.nexmark.model.Done; import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.AfterEach; import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; @@ -184,196 +184,189 @@ public class Query10 extends NexmarkQuery { private PCollection<Done> applyTyped(PCollection<Event> events) { final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER; - return events.apply(name + ".ShardEvents", - ParDo.of(new DoFn<Event, KV<String, Event>>() { - final Aggregator<Long, Long> lateCounter = - createAggregator("actuallyLateEvent", Sum.ofLongs()); - final Aggregator<Long, Long> onTimeCounter = - createAggregator("actuallyOnTimeEvent", Sum.ofLongs()); + return events + .apply(name + ".ShardEvents", + ParDo.of(new DoFn<Event, KV<String, Event>>() { + private final Counter lateCounter = Metrics.counter(name , "actuallyLateEvent"); + private final Counter onTimeCounter = Metrics.counter(name , "onTimeCounter"); - @ProcessElement - public void processElement(ProcessContext c) { - if (c.element().hasAnnotation("LATE")) { - lateCounter.addValue(1L); - LOG.error("Observed late: %s", c.element()); - } else { - onTimeCounter.addValue(1L); - } - int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards); - String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards); - c.output(KV.of(shard, c.element())); - } - })) - .apply(name + ".WindowEvents", - Window.<KV<String, Event>>into( - FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) - .triggering(AfterEach.inOrder( - Repeatedly - .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever( - AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents), - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(LATE_BATCHING_PERIOD))))) - .discardingFiredPanes() - // Use a 1 day allowed lateness so that any forgotten hold will stall the - // pipeline for that period and be very noticeable. - .withAllowedLateness(Duration.standardDays(1))) - .apply(name + ".GroupByKey", GroupByKey.<String, Event>create()) - .apply(name + ".CheckForLateEvents", - ParDo.of(new DoFn<KV<String, Iterable<Event>>, - KV<String, Iterable<Event>>>() { - final Aggregator<Long, Long> earlyCounter = - createAggregator("earlyShard", Sum.ofLongs()); - final Aggregator<Long, Long> onTimeCounter = - createAggregator("onTimeShard", Sum.ofLongs()); - final Aggregator<Long, Long> lateCounter = - createAggregator("lateShard", Sum.ofLongs()); - final Aggregator<Long, Long> unexpectedLatePaneCounter = - createAggregator("ERROR_unexpectedLatePane", Sum.ofLongs()); - final Aggregator<Long, Long> unexpectedOnTimeElementCounter = - createAggregator("ERROR_unexpectedOnTimeElement", Sum.ofLongs()); + @ProcessElement + public void processElement(ProcessContext c) { + if (c.element().hasAnnotation("LATE")) { + lateCounter.inc(); + LOG.error("Observed late: %s", c.element()); + } else { + onTimeCounter.inc(); + } + int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards); + String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards); + c.output(KV.of(shard, c.element())); + } + })) + .apply(name + ".WindowEvents", + Window.<KV<String, Event>>into( + FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) + .triggering(AfterEach.inOrder( + Repeatedly + .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever( + AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(LATE_BATCHING_PERIOD))))) + .discardingFiredPanes() + // Use a 1 day allowed lateness so that any forgotten hold will stall the + // pipeline for that period and be very noticeable. + .withAllowedLateness(Duration.standardDays(1))) + .apply(name + ".GroupByKey", GroupByKey.<String, Event>create()) + .apply(name + ".CheckForLateEvents", + ParDo.of(new DoFn<KV<String, Iterable<Event>>, + KV<String, Iterable<Event>>>() { + private final Counter earlyCounter = Metrics.counter(name , "earlyShard"); + private final Counter onTimeCounter = Metrics.counter(name , "onTimeShard"); + private final Counter lateCounter = Metrics.counter(name , "lateShard"); + private final Counter unexpectedLatePaneCounter = + Metrics.counter(name , "ERROR_unexpectedLatePane"); + private final Counter unexpectedOnTimeElementCounter = + Metrics.counter(name , "ERROR_unexpectedOnTimeElement"); - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - int numLate = 0; - int numOnTime = 0; - for (Event event : c.element().getValue()) { - if (event.hasAnnotation("LATE")) { - numLate++; - } else { - numOnTime++; - } - } - String shard = c.element().getKey(); - LOG.error( - "%s with timestamp %s has %d actually late and %d on-time " - + "elements in pane %s for window %s", - shard, c.timestamp(), numLate, numOnTime, c.pane(), - window.maxTimestamp()); - if (c.pane().getTiming() == PaneInfo.Timing.LATE) { - if (numLate == 0) { - LOG.error( - "ERROR! No late events in late pane for %s", shard); - unexpectedLatePaneCounter.addValue(1L); - } - if (numOnTime > 0) { - LOG.error( - "ERROR! Have %d on-time events in late pane for %s", - numOnTime, shard); - unexpectedOnTimeElementCounter.addValue(1L); - } - lateCounter.addValue(1L); - } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) { - if (numOnTime + numLate < configuration.maxLogEvents) { - LOG.error( - "ERROR! Only have %d events in early pane for %s", - numOnTime + numLate, shard); - } - earlyCounter.addValue(1L); - } else { - onTimeCounter.addValue(1L); - } - c.output(c.element()); - } - })) - .apply(name + ".UploadEvents", - ParDo.of(new DoFn<KV<String, Iterable<Event>>, - KV<Void, OutputFile>>() { - final Aggregator<Long, Long> savedFileCounter = - createAggregator("savedFile", Sum.ofLongs()); - final Aggregator<Long, Long> writtenRecordsCounter = - createAggregator("writtenRecords", Sum.ofLongs()); + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + int numLate = 0; + int numOnTime = 0; + for (Event event : c.element().getValue()) { + if (event.hasAnnotation("LATE")) { + numLate++; + } else { + numOnTime++; + } + } + String shard = c.element().getKey(); + LOG.error( + "%s with timestamp %s has %d actually late and %d on-time " + + "elements in pane %s for window %s", + shard, c.timestamp(), numLate, numOnTime, c.pane(), + window.maxTimestamp()); + if (c.pane().getTiming() == PaneInfo.Timing.LATE) { + if (numLate == 0) { + LOG.error( + "ERROR! No late events in late pane for %s", shard); + unexpectedLatePaneCounter.inc(); + } + if (numOnTime > 0) { + LOG.error( + "ERROR! Have %d on-time events in late pane for %s", + numOnTime, shard); + unexpectedOnTimeElementCounter.inc(); + } + lateCounter.inc(); + } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) { + if (numOnTime + numLate < configuration.maxLogEvents) { + LOG.error( + "ERROR! Only have %d events in early pane for %s", + numOnTime + numLate, shard); + } + earlyCounter.inc(); + } else { + onTimeCounter.inc(); + } + c.output(c.element()); + } + })) + .apply(name + ".UploadEvents", + ParDo.of(new DoFn<KV<String, Iterable<Event>>, + KV<Void, OutputFile>>() { + private final Counter savedFileCounter = Metrics.counter(name , "savedFile"); + private final Counter writtenRecordsCounter = Metrics.counter(name , "writtenRecords"); - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) - throws IOException { - String shard = c.element().getKey(); - GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); - OutputFile outputFile = outputFileFor(window, shard, c.pane()); - LOG.error( - "Writing %s with record timestamp %s, window timestamp %s, pane %s", - shard, c.timestamp(), window.maxTimestamp(), c.pane()); - if (outputFile.filename != null) { - LOG.error("Beginning write to '%s'", outputFile.filename); - int n = 0; - try (OutputStream output = - Channels.newOutputStream(openWritableGcsFile(options, outputFile - .filename))) { - for (Event event : c.element().getValue()) { - Event.CODER.encode(event, output, Coder.Context.OUTER); - writtenRecordsCounter.addValue(1L); - if (++n % 10000 == 0) { - LOG.error("So far written %d records to '%s'", n, - outputFile.filename); - } - } - } - LOG.error("Written all %d records to '%s'", n, outputFile.filename); - } - savedFileCounter.addValue(1L); - c.output(KV.<Void, OutputFile>of(null, outputFile)); - } - })) - // Clear fancy triggering from above. - .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into( - FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) - .triggering(AfterWatermark.pastEndOfWindow()) - // We expect no late data here, but we'll assume the worst so we can detect any. - .withAllowedLateness(Duration.standardDays(1)) - .discardingFiredPanes()) + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) + throws IOException { + String shard = c.element().getKey(); + GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); + OutputFile outputFile = outputFileFor(window, shard, c.pane()); + LOG.error( + "Writing %s with record timestamp %s, window timestamp %s, pane %s", + shard, c.timestamp(), window.maxTimestamp(), c.pane()); + if (outputFile.filename != null) { + LOG.error("Beginning write to '%s'", outputFile.filename); + int n = 0; + try (OutputStream output = + Channels.newOutputStream(openWritableGcsFile(options, outputFile + .filename))) { + for (Event event : c.element().getValue()) { + Event.CODER.encode(event, output, Coder.Context.OUTER); + writtenRecordsCounter.inc(); + if (++n % 10000 == 0) { + LOG.error("So far written %d records to '%s'", n, + outputFile.filename); + } + } + } + LOG.error("Written all %d records to '%s'", n, outputFile.filename); + } + savedFileCounter.inc(); + c.output(KV.<Void, OutputFile>of(null, outputFile)); + } + })) + // Clear fancy triggering from above. + .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into( + FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) + .triggering(AfterWatermark.pastEndOfWindow()) + // We expect no late data here, but we'll assume the worst so we can detect any. + .withAllowedLateness(Duration.standardDays(1)) + .discardingFiredPanes()) // this GroupByKey allows to have one file per window .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create()) - .apply(name + ".Index", - ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() { - final Aggregator<Long, Long> unexpectedLateCounter = - createAggregator("ERROR_unexpectedLate", Sum.ofLongs()); - final Aggregator<Long, Long> unexpectedEarlyCounter = - createAggregator("ERROR_unexpectedEarly", Sum.ofLongs()); - final Aggregator<Long, Long> unexpectedIndexCounter = - createAggregator("ERROR_unexpectedIndex", Sum.ofLongs()); - final Aggregator<Long, Long> finalizedCounter = - createAggregator("indexed", Sum.ofLongs()); + .apply(name + ".Index", + ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() { + private final Counter unexpectedLateCounter = + Metrics.counter(name , "ERROR_unexpectedLate"); + private final Counter unexpectedEarlyCounter = + Metrics.counter(name , "ERROR_unexpectedEarly"); + private final Counter unexpectedIndexCounter = + Metrics.counter(name , "ERROR_unexpectedIndex"); + private final Counter finalizedCounter = Metrics.counter(name , "indexed"); - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) - throws IOException { - if (c.pane().getTiming() == Timing.LATE) { - unexpectedLateCounter.addValue(1L); - LOG.error("ERROR! Unexpected LATE pane: %s", c.pane()); - } else if (c.pane().getTiming() == Timing.EARLY) { - unexpectedEarlyCounter.addValue(1L); - LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane()); - } else if (c.pane().getTiming() == Timing.ON_TIME - && c.pane().getIndex() != 0) { - unexpectedIndexCounter.addValue(1L); - LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane()); - } else { - GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); - LOG.error( - "Index with record timestamp %s, window timestamp %s, pane %s", - c.timestamp(), window.maxTimestamp(), c.pane()); + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) + throws IOException { + if (c.pane().getTiming() == Timing.LATE) { + unexpectedLateCounter.inc(); + LOG.error("ERROR! Unexpected LATE pane: %s", c.pane()); + } else if (c.pane().getTiming() == Timing.EARLY) { + unexpectedEarlyCounter.inc(); + LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane()); + } else if (c.pane().getTiming() == Timing.ON_TIME + && c.pane().getIndex() != 0) { + unexpectedIndexCounter.inc(); + LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane()); + } else { + GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); + LOG.error( + "Index with record timestamp %s, window timestamp %s, pane %s", + c.timestamp(), window.maxTimestamp(), c.pane()); - @Nullable String filename = indexPathFor(window); - if (filename != null) { - LOG.error("Beginning write to '%s'", filename); - int n = 0; - try (OutputStream output = - Channels.newOutputStream( - openWritableGcsFile(options, filename))) { - for (OutputFile outputFile : c.element().getValue()) { - output.write(outputFile.toString().getBytes()); - n++; - } - } - LOG.error("Written all %d lines to '%s'", n, filename); - } - c.output( - new Done("written for timestamp " + window.maxTimestamp())); - finalizedCounter.addValue(1L); - } - } - })); + @Nullable String filename = indexPathFor(window); + if (filename != null) { + LOG.error("Beginning write to '%s'", filename); + int n = 0; + try (OutputStream output = + Channels.newOutputStream( + openWritableGcsFile(options, filename))) { + for (OutputFile outputFile : c.element().getValue()) { + output.write(outputFile.toString().getBytes()); + n++; + } + } + LOG.error("Written all %d lines to '%s'", n, filename); + } + c.output( + new Done("written for timestamp " + window.maxTimestamp())); + finalizedCounter.inc(); + } + } + })); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java index ba31e9f..12b16f1 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java @@ -20,7 +20,6 @@ package org.apache.beam.integration.nexmark.queries; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import javax.annotation.Nullable; import org.apache.beam.integration.nexmark.NexmarkConfiguration; import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; @@ -30,12 +29,12 @@ import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.integration.nexmark.model.NameCityStateId; import org.apache.beam.integration.nexmark.model.Person; import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; @@ -81,14 +80,7 @@ public class Query3 extends NexmarkQuery { public Query3(NexmarkConfiguration configuration) { super(configuration, "Query3"); - joinDoFn = new JoinDoFn(configuration.maxAuctionsWaitingTime); - - } - - @Override - @Nullable - public Aggregator<Long, Long> getFatalCount() { - return joinDoFn.fatalCounter; + joinDoFn = new JoinDoFn(name, configuration.maxAuctionsWaitingTime); } private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) { @@ -195,8 +187,6 @@ public class Query3 extends NexmarkQuery { private static final String PERSON_STATE_EXPIRING = "personStateExpiring"; - public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", Sum.ofLongs()); - @StateId(AUCTIONS) private final StateSpec<Object, ValueState<List<Auction>>> auctionsSpec = StateSpecs.value(ListCoder.of(Auction.CODER)); @@ -204,19 +194,25 @@ public class Query3 extends NexmarkQuery { @TimerId(PERSON_STATE_EXPIRING) private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - private final Aggregator<Long, Long> newAuctionCounter = - createAggregator("newAuction", Sum.ofLongs()); - private final Aggregator<Long, Long> newPersonCounter = - createAggregator("newPerson", Sum.ofLongs()); - private final Aggregator<Long, Long> newNewOutputCounter = - createAggregator("newNewOutput", Sum.ofLongs()); - private final Aggregator<Long, Long> newOldOutputCounter = - createAggregator("newOldOutput", Sum.ofLongs()); - private final Aggregator<Long, Long> oldNewOutputCounter = - createAggregator("oldNewOutput", Sum.ofLongs()); + // Used to refer the metrics namespace + private final String name; - private JoinDoFn(int maxAuctionsWaitingTime) { + private final Counter newAuctionCounter; + private final Counter newPersonCounter; + private final Counter newNewOutputCounter; + private final Counter newOldOutputCounter; + private final Counter oldNewOutputCounter; + private final Counter fatalCounter; + + private JoinDoFn(String name, int maxAuctionsWaitingTime) { + this.name = name; this.maxAuctionsWaitingTime = maxAuctionsWaitingTime; + newAuctionCounter = Metrics.counter(name, "newAuction"); + newPersonCounter = Metrics.counter(name, "newPerson"); + newNewOutputCounter = Metrics.counter(name, "newNewOutput"); + newOldOutputCounter = Metrics.counter(name, "newOldOutput"); + oldNewOutputCounter = Metrics.counter(name, "oldNewOutput"); + fatalCounter = Metrics.counter(name , "fatal"); } @ProcessElement @@ -232,14 +228,13 @@ public class Query3 extends NexmarkQuery { // we need to wait for the pending ReduceFn API. Person existingPerson = personState.read(); - if (existingPerson != null) { // We've already seen the new person event for this person id. // We can join with any new auctions on-the-fly without needing any // additional persistent state. for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) { - newAuctionCounter.addValue(1L); - newOldOutputCounter.addValue(1L); + newAuctionCounter.inc(); + newOldOutputCounter.inc(); c.output(KV.of(newAuction, existingPerson)); } return; @@ -255,24 +250,24 @@ public class Query3 extends NexmarkQuery { } else { LOG.error("**** conflicting persons {} and {} ****", theNewPerson, newPerson); } - fatalCounter.addValue(1L); + fatalCounter.inc(); continue; } - newPersonCounter.addValue(1L); + newPersonCounter.inc(); // We've now seen the person for this person id so can flush any // pending auctions for the same seller id (an auction is done by only one seller). List<Auction> pendingAuctions = auctionsState.read(); if (pendingAuctions != null) { for (Auction pendingAuction : pendingAuctions) { - oldNewOutputCounter.addValue(1L); + oldNewOutputCounter.inc(); c.output(KV.of(pendingAuction, newPerson)); } auctionsState.clear(); } // Also deal with any new auctions. for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) { - newAuctionCounter.addValue(1L); - newNewOutputCounter.addValue(1L); + newAuctionCounter.inc(); + newNewOutputCounter.inc(); c.output(KV.of(newAuction, newPerson)); } // Remember this person for any future auctions. @@ -293,17 +288,17 @@ public class Query3 extends NexmarkQuery { pendingAuctions = new ArrayList<>(); } for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) { - newAuctionCounter.addValue(1L); + newAuctionCounter.inc(); pendingAuctions.add(newAuction); } auctionsState.write(pendingAuctions); } - @OnTimer(PERSON_STATE_EXPIRING) - public void onTimerCallback( - OnTimerContext context, - @StateId(PERSON) ValueState<Person> personState) { - personState.clear(); - } + @OnTimer(PERSON_STATE_EXPIRING) + public void onTimerCallback( + OnTimerContext context, + @StateId(PERSON) ValueState<Person> personState) { + personState.clear(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java index be74151..43d6690 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java @@ -156,7 +156,7 @@ public class BoundedEventSource extends BoundedSource<Event> { } @Override - public List<BoundedEventSource> splitIntoBundles( + public List<BoundedEventSource> split( long desiredBundleSizeBytes, PipelineOptions options) { NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators); List<BoundedEventSource> results = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java index 286c576..c3c6eb0 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java @@ -289,7 +289,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check } @Override - public List<UnboundedEventSource> generateInitialSplits( + public List<UnboundedEventSource> split( int desiredNumSplits, PipelineOptions options) { LOG.trace("splitting unbounded source into {} sub-sources", numEventGenerators); List<UnboundedEventSource> results = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java index 3f85bab..c5d7725 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java @@ -66,6 +66,6 @@ public class BoundedEventSourceTest { long n = 200L; BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1); SourceTestUtils.assertSourcesEqualReferenceSource( - source, source.splitIntoBundles(10, options), options); + source, source.split(10, options), options); } }
