Clean, fix findbugs, fix checkstyle
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f9b4948 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f9b4948 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f9b4948 Branch: refs/heads/master Commit: 2f9b4948fd60a749ada832d003acf0bd84875fcb Parents: 6c11670 Author: Etienne Chauchot <[email protected]> Authored: Tue May 30 18:00:00 2017 +0100 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:29 2017 +0200 ---------------------------------------------------------------------- .../nexmark/NexmarkConfiguration.java | 9 +- .../integration/nexmark/NexmarkLauncher.java | 62 +++++------- .../integration/nexmark/NexmarkOptions.java | 3 +- .../beam/integration/nexmark/NexmarkSuite.java | 4 +- .../beam/integration/nexmark/model/Event.java | 99 ++++++++++---------- .../nexmark/queries/Query0Model.java | 1 - .../nexmark/queries/Query1Model.java | 1 - .../integration/nexmark/queries/Query3.java | 8 +- .../integration/nexmark/queries/Query5.java | 68 ++++++++------ .../integration/nexmark/queries/Query7.java | 2 +- .../nexmark/queries/Query7Model.java | 1 - .../nexmark/queries/WinningBids.java | 37 +++++++- .../nexmark/queries/WinningBidsSimulator.java | 1 - .../integration/nexmark/sources/Generator.java | 36 +++++-- .../nexmark/sources/GeneratorConfig.java | 29 +++--- .../integration/nexmark/queries/QueryTest.java | 6 +- .../sources/UnboundedEventSourceTest.java | 6 +- 17 files changed, 211 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java index 5a8cb71..2faf3f5 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java @@ -19,7 +19,6 @@ package org.apache.beam.integration.nexmark; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.Serializable; import java.util.Objects; @@ -359,11 +358,11 @@ public class NexmarkConfiguration implements Serializable { } /** - * Return clone of configuration with given label. + * Return copy of configuration with given label. */ - @Override - public NexmarkConfiguration clone() { - NexmarkConfiguration result = new NexmarkConfiguration(); + public NexmarkConfiguration copy() { + NexmarkConfiguration result; + result = new NexmarkConfiguration(); result.debug = debug; result.query = query; result.sourceType = sourceType; http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java index db53191..a609975 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java @@ -87,11 +87,13 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; +import org.slf4j.LoggerFactory; /** * Run a single Nexmark query using a given configuration. */ public class NexmarkLauncher<OptionT extends NexmarkOptions> { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class); /** * Minimum number of samples needed for 'stead-state' rate calculation. */ @@ -166,13 +168,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { } /** - * Return number of cores per worker. - */ - protected int coresPerWorker() { - return 4; - } - - /** * Return maximum number of workers. */ private int maxNumWorkers() { @@ -185,7 +180,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { */ private long getCounterMetric(PipelineResult result, String namespace, String name, long defaultValue) { - //TODO Ismael calc this only once MetricQueryResults metrics = result.metrics().queryMetrics( MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); Iterable<MetricResult<Long>> counters = metrics.counters(); @@ -193,7 +187,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { MetricResult<Long> metricResult = counters.iterator().next(); return metricResult.attempted(); } catch (NoSuchElementException e) { - //TODO Ismael + LOG.error("Failed to get metric {}, from namespace {}", name, namespace); } return defaultValue; } @@ -209,15 +203,20 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions(); try { MetricResult<DistributionResult> distributionResult = distributions.iterator().next(); - if (distType.equals(DistributionType.MIN)) { - return distributionResult.attempted().min(); - } else if (distType.equals(DistributionType.MAX)) { - return distributionResult.attempted().max(); - } else { - //TODO Ismael + switch (distType) + { + case MIN: + return distributionResult.attempted().min(); + case MAX: + return distributionResult.attempted().max(); + default: + return defaultValue; } } catch (NoSuchElementException e) { - //TODO Ismael + LOG.error( + "Failed to get distribution metric {} for namespace {}", + name, + namespace); } return defaultValue; } @@ -228,7 +227,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { * Return the current value for a time counter, or -1 if can't be retrieved. */ private long getTimestampMetric(long now, long value) { - //TODO Ismael improve doc + // timestamp metrics are used to monitor time of execution of transforms. + // If result timestamp metric is too far from now, consider that metric is erroneous + if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) { return -1; } @@ -437,16 +438,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { */ private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) { builder.build(options); -// throw new UnsupportedOperationException( -// "Cannot use --pubSubMode=COMBINED with DirectRunner"); - } - - /** - * If monitoring, wait until the publisher pipeline has run long enough to establish - * a backlog on the Pubsub topic. Otherwise, return immediately. - */ - private void waitForPublisherPreload() { - throw new UnsupportedOperationException(); } /** @@ -606,11 +597,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { publisherJob.waitUntilFinish(Duration.standardMinutes(5)); } catch (IOException e) { throw new RuntimeException("Unable to cancel publisher job: ", e); - } //TODO Ismael -// catch (InterruptedException e) { -// Thread.interrupted(); -// throw new RuntimeException("Interrupted: publish job still running.", e); -// } + } } return perf; @@ -755,7 +742,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload); c.output(event); } catch (CoderException e) { - // TODO Log decoding Event error + LOG.error("Error while decoding Event from pusbSub message: serialization error"); } } })); @@ -798,7 +785,8 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element()); c.output(new PubsubMessage(payload, new HashMap<String, String>())); } catch (CoderException e1) { - // TODO Log encoding Event error + LOG.error("Error while sending Event {} to pusbSub: serialization error", + c.element().toString()); } } }) @@ -1130,7 +1118,8 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { sinkEventsToAvro(source); } - // Special hacks for Query 10 (big logger). + // Query 10 logs all events to Google Cloud storage files. It could generate a lot of logs, + // so, set parallelism. Also set the output path where to write log files. if (configuration.query == 10) { String path = null; if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) { @@ -1158,9 +1147,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { sink(results, now); } - if (publisherResult != null) { - waitForPublisherPreload(); - } mainResult = p.run(); mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout)); return monitor(query); http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java index 9afffaa..fbd3e74 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java @@ -19,7 +19,6 @@ package org.apache.beam.integration.nexmark; import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -119,7 +118,7 @@ public interface NexmarkOptions @Nullable Integer getStreamTimeout(); - void setStreamTimeout(Integer preloadSeconds); + void setStreamTimeout(Integer streamTimeout); @Description("Number of unbounded sources to create events.") @Nullable http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java index be7d7b8..0d98a5d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java @@ -57,7 +57,7 @@ public enum NexmarkSuite { private static List<NexmarkConfiguration> smoke() { List<NexmarkConfiguration> configurations = new ArrayList<>(); for (int query = 0; query <= 12; query++) { - NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.clone(); + NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy(); configuration.query = query; configuration.numEvents = 100_000; if (query == 4 || query == 6 || query == 9) { @@ -103,7 +103,7 @@ public enum NexmarkSuite { public Iterable<NexmarkConfiguration> getConfigurations(NexmarkOptions options) { Set<NexmarkConfiguration> results = new LinkedHashSet<>(); for (NexmarkConfiguration configuration : configurations) { - NexmarkConfiguration result = configuration.clone(); + NexmarkConfiguration result = configuration.copy(); result.overrideFromOptions(options); results.add(result); } http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java index d813833..0e1672e 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java @@ -23,55 +23,65 @@ import java.io.OutputStream; import java.io.Serializable; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarIntCoder; /** - * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, - * or a {@link Bid}. + * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, or a + * {@link Bid}. */ public class Event implements KnownSize, Serializable { - private static final Coder<Integer> INT_CODER = VarIntCoder.of(); + private enum Tag { + PERSON(0), + AUCTION(1), + BID(2); - public static final Coder<Event> CODER = new CustomCoder<Event>() { - @Override - public void encode(Event value, OutputStream outStream) - throws CoderException, IOException { - if (value.newPerson != null) { - INT_CODER.encode(0, outStream); - Person.CODER.encode(value.newPerson, outStream); - } else if (value.newAuction != null) { - INT_CODER.encode(1, outStream); - Auction.CODER.encode(value.newAuction, outStream); - } else if (value.bid != null) { - INT_CODER.encode(2, outStream); - Bid.CODER.encode(value.bid, outStream); - } else { - throw new RuntimeException("invalid event"); - } - } + private int value = -1; - @Override - public Event decode( - InputStream inStream) - throws CoderException, IOException { - int tag = INT_CODER.decode(inStream); - if (tag == 0) { - Person person = Person.CODER.decode(inStream); - return new Event(person); - } else if (tag == 1) { - Auction auction = Auction.CODER.decode(inStream); - return new Event(auction); - } else if (tag == 2) { - Bid bid = Bid.CODER.decode(inStream); - return new Event(bid); - } else { - throw new RuntimeException("invalid event encoding"); - } + Tag(int value){ + this.value = value; } - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; + } + private static final Coder<Integer> INT_CODER = VarIntCoder.of(); + + public static final Coder<Event> CODER = + new CustomCoder<Event>() { + @Override + public void encode(Event value, OutputStream outStream) throws IOException { + if (value.newPerson != null) { + INT_CODER.encode(Tag.PERSON.value, outStream); + Person.CODER.encode(value.newPerson, outStream); + } else if (value.newAuction != null) { + INT_CODER.encode(Tag.AUCTION.value, outStream); + Auction.CODER.encode(value.newAuction, outStream); + } else if (value.bid != null) { + INT_CODER.encode(Tag.BID.value, outStream); + Bid.CODER.encode(value.bid, outStream); + } else { + throw new RuntimeException("invalid event"); + } + } + + @Override + public Event decode(InputStream inStream) throws IOException { + int tag = INT_CODER.decode(inStream); + if (tag == Tag.PERSON.value) { + Person person = Person.CODER.decode(inStream); + return new Event(person); + } else if (tag == Tag.AUCTION.value) { + Auction auction = Auction.CODER.decode(inStream); + return new Event(auction); + } else if (tag == Tag.BID.value) { + Bid bid = Bid.CODER.decode(inStream); + return new Event(bid); + } else { + throw new RuntimeException("invalid event encoding"); + } + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} + }; @Nullable @org.apache.avro.reflect.Nullable @@ -111,10 +121,7 @@ public class Event implements KnownSize, Serializable { this.bid = bid; } - /** - * Return a copy of event which captures {@code annotation}. - * (Used for debugging). - */ + /** Return a copy of event which captures {@code annotation}. (Used for debugging). */ public Event withAnnotation(String annotation) { if (newPerson != null) { return new Event(newPerson.withAnnotation(annotation)); @@ -125,9 +132,7 @@ public class Event implements KnownSize, Serializable { } } - /** - * Does event have {@code annotation}? (Used for debugging.) - */ + /** Does event have {@code annotation}? (Used for debugging.) */ public boolean hasAnnotation(String annotation) { if (newPerson != null) { return newPerson.hasAnnotation(annotation); http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java index 8e65591..e2522b8 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java @@ -45,7 +45,6 @@ public class Query0Model extends NexmarkQueryModel { return; } addResult(timestampedEvent); - //TODO test fails because offset of some hundreds of ms beween expect and actual } } http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java index 5d4de45..f07db80 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java @@ -57,7 +57,6 @@ public class Query1Model extends NexmarkQueryModel implements Serializable { TimestampedValue<Bid> result = TimestampedValue.of(resultBid, timestampedEvent.getTimestamp()); addResult(result); - //TODO test fails because offset of some hundreds of ms beween expect and actual } } http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java index f74b78d..f2b66d7 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java @@ -29,13 +29,13 @@ import org.apache.beam.integration.nexmark.model.Person; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.state.StateSpec; -import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.ParDo; @@ -243,9 +243,9 @@ public class Query3 extends NexmarkQuery { theNewPerson = newPerson; } else { if (theNewPerson.equals(newPerson)) { - LOG.error("**** duplicate person {} ****", theNewPerson); + LOG.error("Duplicate person {}", theNewPerson); } else { - LOG.error("**** conflicting persons {} and {} ****", theNewPerson, newPerson); + LOG.error("Conflicting persons {} and {}", theNewPerson, newPerson); } fatalCounter.inc(); continue; http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java index 1944330..bdf3e5f 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java @@ -63,56 +63,64 @@ public class Query5 extends NexmarkQuery { // Only want the bid events. .apply(JUST_BIDS) // Window the bids into sliding windows. - .apply(Window.<Bid>into( - SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec)) - .every(Duration.standardSeconds(configuration.windowPeriodSec)))) + .apply( + Window.<Bid>into( + SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec)) + .every(Duration.standardSeconds(configuration.windowPeriodSec)))) // Project just the auction id. .apply("BidToAuction", BID_TO_AUCTION) // Count the number of bids per auction id. .apply(Count.<Long>perElement()) - // We'll want to keep all auctions with the maximal number of bids. + // We'll want to keep all auctions with the maximal number of bids. // Start by lifting each into a singleton list. // need to do so because bellow combine returns a list of auctions in the key in case of // equal number of bids. Combine needs to have same input type and return type. - .apply(name + ".ToSingletons", - ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() { + .apply( + name + ".ToSingletons", + ParDo.of( + new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() { @ProcessElement public void processElement(ProcessContext c) { - c.output(KV.of(Collections.singletonList(c.element().getKey()), c.element().getValue())); + c.output( + KV.of( + Collections.singletonList(c.element().getKey()), + c.element().getValue())); } })) // Keep only the auction ids with the most bids. .apply( - Combine - .globally(new Combine.BinaryCombineFn<KV<List<Long>, Long>>() { - @Override - public KV<List<Long>, Long> apply( - KV<List<Long>, Long> left, KV<List<Long>, Long> right) { - List<Long> leftBestAuctions = left.getKey(); - long leftCount = left.getValue(); - List<Long> rightBestAuctions = right.getKey(); - long rightCount = right.getValue(); - if (leftCount > rightCount) { - return left; - } else if (leftCount < rightCount) { - return right; - } else { - List<Long> newBestAuctions = new ArrayList<>(); - newBestAuctions.addAll(leftBestAuctions); - newBestAuctions.addAll(rightBestAuctions); - return KV.of(newBestAuctions, leftCount); - } - } - }) + Combine.globally( + new Combine.BinaryCombineFn<KV<List<Long>, Long>>() { + @Override + public KV<List<Long>, Long> apply( + KV<List<Long>, Long> left, KV<List<Long>, Long> right) { + List<Long> leftBestAuctions = left.getKey(); + long leftCount = left.getValue(); + List<Long> rightBestAuctions = right.getKey(); + long rightCount = right.getValue(); + if (leftCount > rightCount) { + return left; + } else if (leftCount < rightCount) { + return right; + } else { + List<Long> newBestAuctions = new ArrayList<>(); + newBestAuctions.addAll(leftBestAuctions); + newBestAuctions.addAll(rightBestAuctions); + return KV.of(newBestAuctions, leftCount); + } + } + }) .withoutDefaults() .withFanout(configuration.fanout)) // Project into result. - .apply(name + ".Select", - ParDo.of(new DoFn<KV<List<Long>, Long>, AuctionCount>() { + .apply( + name + ".Select", + ParDo.of( + new DoFn<KV<List<Long>, Long>, AuctionCount>() { @ProcessElement public void processElement(ProcessContext c) { long count = c.element().getValue(); http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java index 2a94ca9..217d0d4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java @@ -63,7 +63,7 @@ public class Query7 extends NexmarkQuery { // requires an additional scan per window, with the associated cost of snapshotted state and // its I/O. We'll keep this implementation since it illustrates the use of side inputs. final PCollectionView<Long> maxPriceView = - slidingBids // + slidingBids .apply("BidToPrice", BID_TO_PRICE) .apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView()); http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java index 5c039f9..0ada5e8 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java @@ -111,7 +111,6 @@ public class Query7Model extends NexmarkQueryModel implements Serializable { } // Keep only the highest bids. captureBid(event.bid); - //TODO test fails because offset of some hundreds of ms between expect and actual } } http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java index bd6c2ed..d4ca177 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java @@ -23,12 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.TreeMap; import org.apache.beam.integration.nexmark.NexmarkConfiguration; import org.apache.beam.integration.nexmark.NexmarkUtils; @@ -139,6 +139,24 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}", start(), end(), auction, isAuctionWindow); } + + @Override public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + AuctionOrBidWindow that = (AuctionOrBidWindow) o; + return (isAuctionWindow == that.isAuctionWindow) && (auction == that.auction); + } + + @Override public int hashCode() { + return Objects.hash(isAuctionWindow, auction); + } } /** @@ -374,4 +392,21 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct } )); } + + @Override + public int hashCode() { + return Objects.hash(auctionOrBidWindowFn); + } + + @Override public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + WinningBids that = (WinningBids) o; + return auctionOrBidWindowFn.equals(that.auctionOrBidWindowFn); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java index 7d74f8f..9624a9d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java @@ -181,7 +181,6 @@ public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> { return; } addResult(result); - //TODO test fails because offset of some hundreds of ms beween expect and actual return; } http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java index 4f548cd..f6deceb 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java @@ -26,6 +26,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Random; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.Bid; @@ -167,7 +168,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl } /** - * Return a deep clone of next event with delay added to wallclock timestamp and + * Return a deep copy of next event with delay added to wallclock timestamp and * event annotate as 'LATE'. */ public NextEvent withDelay(long delayMs) { @@ -175,6 +176,26 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark); } + @Override public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + NextEvent nextEvent = (NextEvent) o; + + return (wallclockTimestamp == nextEvent.wallclockTimestamp + && eventTimestamp == nextEvent.eventTimestamp + && watermark == nextEvent.watermark + && event.equals(nextEvent.event)); + } + + @Override public int hashCode() { + return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event); + } + @Override public int compareTo(NextEvent other) { int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp); @@ -221,11 +242,12 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl } /** - * Return a deep clone of this generator. + * Return a deep copy of this generator. */ - @Override - public Generator clone() { - return new Generator(config.clone(), numEvents, wallclockBaseTime); + public Generator copy() { + checkNotNull(config); + Generator result = new Generator(config, numEvents, wallclockBaseTime); + return result; } /** @@ -243,9 +265,9 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl */ public GeneratorConfig splitAtEventId(long eventId) { long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber); - GeneratorConfig remainConfig = config.cloneWith(config.firstEventId, + GeneratorConfig remainConfig = config.copyWith(config.firstEventId, config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents); - config = config.cloneWith(config.firstEventId, newMaxEvents, config.firstEventNumber); + config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber); return remainConfig; } http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java index 5799bb2..95c276b 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java @@ -17,6 +17,8 @@ */ package org.apache.beam.integration.nexmark.sources; +import static com.google.common.base.Preconditions.checkNotNull; + import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -29,6 +31,7 @@ import org.apache.beam.sdk.values.KV; * Parameters controlling how {@link Generator} synthesizes {@link Event} elements. */ public class GeneratorConfig implements Serializable { + /** * We start the ids at specific values to help ensure the queries find a match even on * small synthesized dataset sizes. @@ -132,18 +135,13 @@ public class GeneratorConfig implements Serializable { } /** - * Return a clone of this config. - */ - @Override - public GeneratorConfig clone() { - return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber); - } - - /** - * Return clone of this config except with given parameters. + * Return a copy of this config. */ - public GeneratorConfig cloneWith(long firstEventId, long maxEvents, long firstEventNumber) { - return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber); + public GeneratorConfig copy() { + GeneratorConfig result; + result = new GeneratorConfig(configuration, baseTime, firstEventId, + maxEvents, firstEventNumber); + return result; } /** @@ -164,7 +162,7 @@ public class GeneratorConfig implements Serializable { // Don't loose any events to round-down. subMaxEvents = maxEvents - subMaxEvents * (n - 1); } - results.add(cloneWith(subFirstEventId, subMaxEvents, firstEventNumber)); + results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber)); subFirstEventId += subMaxEvents; } } @@ -172,6 +170,13 @@ public class GeneratorConfig implements Serializable { } /** + * Return copy of this config except with given parameters. + */ + public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) { + return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber); + } + + /** * Return an estimate of the bytes needed by {@code numEvents}. */ public long estimatedBytesForEvents(long numEvents) { http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java index b005d65..64a8e4f 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java @@ -37,7 +37,7 @@ import org.junit.runners.JUnit4; /** Test the various NEXMark queries yield results coherent with their models. */ @RunWith(JUnit4.class) public class QueryTest { - private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.clone(); + private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.copy(); static { // careful, results of tests are linked to numEventGenerators because of timestamp generation @@ -55,12 +55,8 @@ public class QueryTest { if (streamingMode) { results = p.apply(name + ".ReadUnBounded", NexmarkUtils.streamEventsSource(CONFIG)).apply(query); - //TODO Ismael this should not be called explicitly - results.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); } else { results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query); - //TODO Ismael this should not be called explicitly - results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); } PAssert.that(results).satisfies(model.assertionFor()); PipelineResult result = p.run(); http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java index 1d04e2a..1ecc33e 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java @@ -28,7 +28,6 @@ import java.util.Set; import org.apache.beam.integration.nexmark.NexmarkConfiguration; import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.options.PipelineOptions; @@ -95,12 +94,11 @@ public class UnboundedEventSourceTest { while (n > 0) { int m = Math.min(459 + random.nextInt(455), n); - System.out.printf("reading %d...\n", m); + System.out.printf("reading %d...%n", m); checker.add(m, reader, modelGenerator); n -= m; - System.out.printf("splitting with %d remaining...\n", n); + System.out.printf("splitting with %d remaining...%n", n); CheckpointMark checkpointMark = reader.getCheckpointMark(); - assertTrue(checkpointMark instanceof Generator.Checkpoint); reader = source.createReader(options, (Generator.Checkpoint) checkpointMark); }
