http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java deleted file mode 100644 index 9624a9d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import javax.annotation.Nullable; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.AuctionBid; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; - -/** - * A simulator of the {@code WinningBids} query. - */ -public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> { - /** Auctions currently still open, indexed by auction id. */ - private final Map<Long, Auction> openAuctions; - - /** The ids of auctions known to be closed. */ - private final Set<Long> closedAuctions; - - /** Current best valid bids for open auctions, indexed by auction id. */ - private final Map<Long, Bid> bestBids; - - /** Bids for auctions we havn't seen yet. */ - private final List<Bid> bidsWithoutAuctions; - - /** - * Timestamp of last new auction or bid event (ms since epoch). - */ - private long lastTimestamp; - - public WinningBidsSimulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - openAuctions = new TreeMap<>(); - closedAuctions = new TreeSet<>(); - bestBids = new TreeMap<>(); - bidsWithoutAuctions = new ArrayList<>(); - lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); - } - - /** - * Try to account for {@code bid} in state. Return true if bid has now been - * accounted for by {@code bestBids}. - */ - private boolean captureBestBid(Bid bid, boolean shouldLog) { - if (closedAuctions.contains(bid.auction)) { - // Ignore bids for known, closed auctions. - if (shouldLog) { - NexmarkUtils.info("closed auction: %s", bid); - } - return true; - } - Auction auction = openAuctions.get(bid.auction); - if (auction == null) { - // We don't have an auction for this bid yet, so can't determine if it is - // winning or not. - if (shouldLog) { - NexmarkUtils.info("pending auction: %s", bid); - } - return false; - } - if (bid.price < auction.reserve) { - // Bid price is too low. - if (shouldLog) { - NexmarkUtils.info("below reserve: %s", bid); - } - return true; - } - Bid existingBid = bestBids.get(bid.auction); - if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) { - // We've found a (new) best bid for a known auction. - bestBids.put(bid.auction, bid); - if (shouldLog) { - NexmarkUtils.info("new winning bid: %s", bid); - } - } else { - if (shouldLog) { - NexmarkUtils.info("ignoring low bid: %s", bid); - } - } - return true; - } - - /** - * Try to match bids without auctions to auctions. - */ - private void flushBidsWithoutAuctions() { - Iterator<Bid> itr = bidsWithoutAuctions.iterator(); - while (itr.hasNext()) { - Bid bid = itr.next(); - if (captureBestBid(bid, false)) { - NexmarkUtils.info("bid now accounted for: %s", bid); - itr.remove(); - } - } - } - - /** - * Return the next winning bid for an expired auction relative to {@code timestamp}. - * Return null if no more winning bids, in which case all expired auctions will - * have been removed from our state. Retire auctions in order of expire time. - */ - @Nullable - private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) { - Map<Long, List<Long>> toBeRetired = new TreeMap<>(); - for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) { - if (entry.getValue().expires <= timestamp) { - List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires); - if (idsAtTime == null) { - idsAtTime = new ArrayList<>(); - toBeRetired.put(entry.getValue().expires, idsAtTime); - } - idsAtTime.add(entry.getKey()); - } - } - for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) { - for (long id : entry.getValue()) { - Auction auction = openAuctions.get(id); - NexmarkUtils.info("retiring auction: %s", auction); - openAuctions.remove(id); - Bid bestBid = bestBids.get(id); - if (bestBid != null) { - TimestampedValue<AuctionBid> result = - TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires)); - NexmarkUtils.info("winning: %s", result); - return result; - } - } - } - return null; - } - - @Override - protected void run() { - if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) { - // We may have finally seen the auction a bid was intended for. - flushBidsWithoutAuctions(); - TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp); - if (result != null) { - addResult(result); - return; - } - } - - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - // No more events. Flush any still open auctions. - TimestampedValue<AuctionBid> result = - nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); - if (result == null) { - // We are done. - allDone(); - return; - } - addResult(result); - return; - } - - Event event = timestampedEvent.getValue(); - if (event.newPerson != null) { - // Ignore new person events. - return; - } - - lastTimestamp = timestampedEvent.getTimestamp().getMillis(); - if (event.newAuction != null) { - // Add this new open auction to our state. - openAuctions.put(event.newAuction.id, event.newAuction); - } else { - if (!captureBestBid(event.bid, true)) { - // We don't know what to do with this bid yet. - NexmarkUtils.info("bid not yet accounted for: %s", event.bid); - bidsWithoutAuctions.add(event.bid); - } - } - // Keep looking for winning bids. - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/package-info.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/package-info.java deleted file mode 100644 index 7a56733..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Nexmark Queries. - */ -package org.apache.beam.integration.nexmark.queries; http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java deleted file mode 100644 index 43d6690..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.sources; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; - -/** - * A custom, bounded source of event records. - */ -public class BoundedEventSource extends BoundedSource<Event> { - /** Configuration we generate events against. */ - private final GeneratorConfig config; - - /** How many bounded sources to create. */ - private final int numEventGenerators; - - public BoundedEventSource(GeneratorConfig config, int numEventGenerators) { - this.config = config; - this.numEventGenerators = numEventGenerators; - } - - /** A reader to pull events from the generator. */ - private static class EventReader extends BoundedReader<Event> { - /** - * Event source we purporting to be reading from. - * (We can't use Java's capture-outer-class pointer since we must update - * this field on calls to splitAtFraction.) - */ - private BoundedEventSource source; - - /** Generator we are reading from. */ - private final Generator generator; - - private boolean reportedStop; - - @Nullable - private TimestampedValue<Event> currentEvent; - - public EventReader(BoundedEventSource source, GeneratorConfig config) { - this.source = source; - generator = new Generator(config); - reportedStop = false; - } - - @Override - public synchronized boolean start() { - NexmarkUtils.info("starting bounded generator %s", generator); - return advance(); - } - - @Override - public synchronized boolean advance() { - if (!generator.hasNext()) { - // No more events. - if (!reportedStop) { - reportedStop = true; - NexmarkUtils.info("stopped bounded generator %s", generator); - } - return false; - } - currentEvent = generator.next(); - return true; - } - - @Override - public synchronized Event getCurrent() throws NoSuchElementException { - if (currentEvent == null) { - throw new NoSuchElementException(); - } - return currentEvent.getValue(); - } - - @Override - public synchronized Instant getCurrentTimestamp() throws NoSuchElementException { - if (currentEvent == null) { - throw new NoSuchElementException(); - } - return currentEvent.getTimestamp(); - } - - @Override - public void close() throws IOException { - // Nothing to close. - } - - @Override - public synchronized Double getFractionConsumed() { - return generator.getFractionConsumed(); - } - - @Override - public synchronized BoundedSource<Event> getCurrentSource() { - return source; - } - - @Override - @Nullable - public synchronized BoundedEventSource splitAtFraction(double fraction) { - long startId = generator.getCurrentConfig().getStartEventId(); - long stopId = generator.getCurrentConfig().getStopEventId(); - long size = stopId - startId; - long splitEventId = startId + Math.min((int) (size * fraction), size); - if (splitEventId <= generator.getNextEventId() || splitEventId == stopId) { - // Already passed this position or split results in left or right being empty. - NexmarkUtils.info("split failed for bounded generator %s at %f", generator, fraction); - return null; - } - - NexmarkUtils.info("about to split bounded generator %s at %d", generator, splitEventId); - - // Scale back the event space of the current generator, and return a generator config - // representing the event space we just 'stole' from the current generator. - GeneratorConfig remainingConfig = generator.splitAtEventId(splitEventId); - - NexmarkUtils.info("split bounded generator into %s and %s", generator, remainingConfig); - - // At this point - // generator.events() ++ new Generator(remainingConfig).events() - // == originalGenerator.events() - - // We need a new source to represent the now smaller key space for this reader, so - // that we can maintain the invariant that - // this.getCurrentSource().createReader(...) - // will yield the same output as this. - source = new BoundedEventSource(generator.getCurrentConfig(), source.numEventGenerators); - - // Return a source from which we may read the 'stolen' event space. - return new BoundedEventSource(remainingConfig, source.numEventGenerators); - } - } - - @Override - public List<BoundedEventSource> split( - long desiredBundleSizeBytes, PipelineOptions options) { - NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators); - List<BoundedEventSource> results = new ArrayList<>(); - // Ignore desiredBundleSizeBytes and use numEventGenerators instead. - for (GeneratorConfig subConfig : config.split(numEventGenerators)) { - results.add(new BoundedEventSource(subConfig, 1)); - } - return results; - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) { - return config.getEstimatedSizeBytes(); - } - - @Override - public EventReader createReader(PipelineOptions options) { - NexmarkUtils.info("creating initial bounded reader for %s", config); - return new EventReader(this, config); - } - - @Override - public void validate() { - // Nothing to validate. - } - - @Override - public Coder<Event> getDefaultOutputCoder() { - return Event.CODER; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java deleted file mode 100644 index f6deceb..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java +++ /dev/null @@ -1,609 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.sources; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; -import java.util.Random; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; - -/** - * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure - * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have - * valid auction and bidder ids which can be joined to already-generated Auction and Person events. - * - * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new - * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs} - * (in microseconds). The event stream is thus fully deterministic and does not depend on - * wallclock time. - * - * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark} - * so that we can resume generating events from a saved snapshot. - */ -public class Generator implements Iterator<TimestampedValue<Event>>, Serializable { - /** - * Keep the number of categories small so the example queries will find results even with - * a small batch of events. - */ - private static final int NUM_CATEGORIES = 5; - - /** Smallest random string size. */ - private static final int MIN_STRING_LENGTH = 3; - - /** - * Keep the number of states small so that the example queries will find results even with - * a small batch of events. - */ - private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(",")); - - private static final List<String> US_CITIES = - Arrays.asList( - ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne") - .split(",")); - - private static final List<String> FIRST_NAMES = - Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(",")); - - private static final List<String> LAST_NAMES = - Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(",")); - - /** - * Number of yet-to-be-created people and auction ids allowed. - */ - private static final int PERSON_ID_LEAD = 10; - private static final int AUCTION_ID_LEAD = 10; - - /** - * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1 - * over these values. - */ - private static final int HOT_AUCTION_RATIO = 100; - private static final int HOT_SELLER_RATIO = 100; - private static final int HOT_BIDDER_RATIO = 100; - - /** - * Just enough state to be able to restore a generator back to where it was checkpointed. - */ - public static class Checkpoint implements UnboundedSource.CheckpointMark { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - - /** Coder for this class. */ - public static final Coder<Checkpoint> CODER_INSTANCE = - new CustomCoder<Checkpoint>() { - @Override public void encode(Checkpoint value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.numEvents, outStream); - LONG_CODER.encode(value.wallclockBaseTime, outStream); - } - - @Override - public Checkpoint decode(InputStream inStream) - throws CoderException, IOException { - long numEvents = LONG_CODER.decode(inStream); - long wallclockBaseTime = LONG_CODER.decode(inStream); - return new Checkpoint(numEvents, wallclockBaseTime); - } - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; - - private final long numEvents; - private final long wallclockBaseTime; - - private Checkpoint(long numEvents, long wallclockBaseTime) { - this.numEvents = numEvents; - this.wallclockBaseTime = wallclockBaseTime; - } - - public Generator toGenerator(GeneratorConfig config) { - return new Generator(config, numEvents, wallclockBaseTime); - } - - @Override - public void finalizeCheckpoint() throws IOException { - // Nothing to finalize. - } - - @Override - public String toString() { - return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}", - numEvents, wallclockBaseTime); - } - } - - /** - * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then - * (arbitrary but stable) event hash order. - */ - public static class NextEvent implements Comparable<NextEvent> { - /** When, in wallclock time, should this event be emitted? */ - public final long wallclockTimestamp; - - /** When, in event time, should this event be considered to have occured? */ - public final long eventTimestamp; - - /** The event itself. */ - public final Event event; - - /** The minimum of this and all future event timestamps. */ - public final long watermark; - - public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) { - this.wallclockTimestamp = wallclockTimestamp; - this.eventTimestamp = eventTimestamp; - this.event = event; - this.watermark = watermark; - } - - /** - * Return a deep copy of next event with delay added to wallclock timestamp and - * event annotate as 'LATE'. - */ - public NextEvent withDelay(long delayMs) { - return new NextEvent( - wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark); - } - - @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - NextEvent nextEvent = (NextEvent) o; - - return (wallclockTimestamp == nextEvent.wallclockTimestamp - && eventTimestamp == nextEvent.eventTimestamp - && watermark == nextEvent.watermark - && event.equals(nextEvent.event)); - } - - @Override public int hashCode() { - return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event); - } - - @Override - public int compareTo(NextEvent other) { - int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp); - if (i != 0) { - return i; - } - return Integer.compare(event.hashCode(), other.event.hashCode()); - } - } - - /** - * Configuration to generate events against. Note that it may be replaced by a call to - * {@link #splitAtEventId}. - */ - private GeneratorConfig config; - - /** Number of events generated by this generator. */ - private long numEvents; - - /** - * Wallclock time at which we emitted the first event (ms since epoch). Initially -1. - */ - private long wallclockBaseTime; - - private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) { - checkNotNull(config); - this.config = config; - this.numEvents = numEvents; - this.wallclockBaseTime = wallclockBaseTime; - } - - /** - * Create a fresh generator according to {@code config}. - */ - public Generator(GeneratorConfig config) { - this(config, 0, -1); - } - - /** - * Return a checkpoint for the current generator. - */ - public Checkpoint toCheckpoint() { - return new Checkpoint(numEvents, wallclockBaseTime); - } - - /** - * Return a deep copy of this generator. - */ - public Generator copy() { - checkNotNull(config); - Generator result = new Generator(config, numEvents, wallclockBaseTime); - return result; - } - - /** - * Return the current config for this generator. Note that configs may be replaced by {@link - * #splitAtEventId}. - */ - public GeneratorConfig getCurrentConfig() { - return config; - } - - /** - * Mutate this generator so that it will only generate events up to but not including - * {@code eventId}. Return a config to represent the events this generator will no longer yield. - * The generators will run in on a serial timeline. - */ - public GeneratorConfig splitAtEventId(long eventId) { - long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber); - GeneratorConfig remainConfig = config.copyWith(config.firstEventId, - config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents); - config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber); - return remainConfig; - } - - /** - * Return the next 'event id'. Though events don't have ids we can simulate them to - * help with bookkeeping. - */ - public long getNextEventId() { - return config.firstEventId + config.nextAdjustedEventNumber(numEvents); - } - - /** - * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if - * due to generate a person. - */ - private long lastBase0PersonId() { - long eventId = getNextEventId(); - long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; - long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; - if (offset >= GeneratorConfig.PERSON_PROPORTION) { - // About to generate an auction or bid. - // Go back to the last person generated in this epoch. - offset = GeneratorConfig.PERSON_PROPORTION - 1; - } - // About to generate a person. - return epoch * GeneratorConfig.PERSON_PROPORTION + offset; - } - - /** - * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if - * due to generate an auction. - */ - private long lastBase0AuctionId() { - long eventId = getNextEventId(); - long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; - long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; - if (offset < GeneratorConfig.PERSON_PROPORTION) { - // About to generate a person. - // Go back to the last auction in the last epoch. - epoch--; - offset = GeneratorConfig.AUCTION_PROPORTION - 1; - } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { - // About to generate a bid. - // Go back to the last auction generated in this epoch. - offset = GeneratorConfig.AUCTION_PROPORTION - 1; - } else { - // About to generate an auction. - offset -= GeneratorConfig.PERSON_PROPORTION; - } - return epoch * GeneratorConfig.AUCTION_PROPORTION + offset; - } - - /** return a random US state. */ - private static String nextUSState(Random random) { - return US_STATES.get(random.nextInt(US_STATES.size())); - } - - /** Return a random US city. */ - private static String nextUSCity(Random random) { - return US_CITIES.get(random.nextInt(US_CITIES.size())); - } - - /** Return a random person name. */ - private static String nextPersonName(Random random) { - return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " " - + LAST_NAMES.get(random.nextInt(LAST_NAMES.size())); - } - - /** Return a random string of up to {@code maxLength}. */ - private static String nextString(Random random, int maxLength) { - int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH); - StringBuilder sb = new StringBuilder(); - while (len-- > 0) { - if (random.nextInt(13) == 0) { - sb.append(' '); - } else { - sb.append((char) ('a' + random.nextInt(26))); - } - } - return sb.toString().trim(); - } - - /** Return a random string of exactly {@code length}. */ - private static String nextExactString(Random random, int length) { - StringBuilder sb = new StringBuilder(); - while (length-- > 0) { - sb.append((char) ('a' + random.nextInt(26))); - } - return sb.toString(); - } - - /** Return a random email address. */ - private static String nextEmail(Random random) { - return nextString(random, 7) + "@" + nextString(random, 5) + ".com"; - } - - /** Return a random credit card number. */ - private static String nextCreditCard(Random random) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < 4; i++) { - if (i > 0) { - sb.append(' '); - } - sb.append(String.format("%04d", random.nextInt(10000))); - } - return sb.toString(); - } - - /** Return a random price. */ - private static long nextPrice(Random random) { - return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0); - } - - /** Return a random time delay, in milliseconds, for length of auctions. */ - private long nextAuctionLengthMs(Random random, long timestamp) { - // What's our current event number? - long currentEventNumber = config.nextAdjustedEventNumber(numEvents); - // How many events till we've generated numInFlightAuctions? - long numEventsForAuctions = - (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR) - / GeneratorConfig.AUCTION_PROPORTION; - // When will the auction numInFlightAuctions beyond now be generated? - long futureAuction = - config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions) - .getKey(); - // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n", - // futureAuction - timestamp, numEventsForAuctions); - // Choose a length with average horizonMs. - long horizonMs = futureAuction - timestamp; - return 1L + nextLong(random, Math.max(horizonMs * 2, 1L)); - } - - /** - * Return a random {@code string} such that {@code currentSize + string.length()} is on average - * {@code averageSize}. - */ - private static String nextExtra(Random random, int currentSize, int desiredAverageSize) { - if (currentSize > desiredAverageSize) { - return ""; - } - desiredAverageSize -= currentSize; - int delta = (int) Math.round(desiredAverageSize * 0.2); - int minSize = desiredAverageSize - delta; - int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta)); - return nextExactString(random, desiredSize); - } - - /** Return a random long from {@code [0, n)}. */ - private static long nextLong(Random random, long n) { - if (n < Integer.MAX_VALUE) { - return random.nextInt((int) n); - } else { - // WARNING: Very skewed distribution! Bad! - return Math.abs(random.nextLong() % n); - } - } - - /** - * Generate and return a random person with next available id. - */ - private Person nextPerson(Random random, long timestamp) { - long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID; - String name = nextPersonName(random); - String email = nextEmail(random); - String creditCard = nextCreditCard(random); - String city = nextUSCity(random); - String state = nextUSState(random); - int currentSize = - 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length(); - String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize); - return new Person(id, name, email, creditCard, city, state, timestamp, extra); - } - - /** - * Return a random person id (base 0). - */ - private long nextBase0PersonId(Random random) { - // Choose a random person from any of the 'active' people, plus a few 'leads'. - // By limiting to 'active' we ensure the density of bids or auctions per person - // does not decrease over time for long running jobs. - // By choosing a person id ahead of the last valid person id we will make - // newPerson and newAuction events appear to have been swapped in time. - long numPeople = lastBase0PersonId() + 1; - long activePeople = Math.min(numPeople, config.configuration.numActivePeople); - long n = nextLong(random, activePeople + PERSON_ID_LEAD); - return numPeople - activePeople + n; - } - - /** - * Return a random auction id (base 0). - */ - private long nextBase0AuctionId(Random random) { - // Choose a random auction for any of those which are likely to still be in flight, - // plus a few 'leads'. - // Note that ideally we'd track non-expired auctions exactly, but that state - // is difficult to split. - long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0); - long maxAuction = lastBase0AuctionId(); - return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD); - } - - /** - * Generate and return a random auction with next available id. - */ - private Auction nextAuction(Random random, long timestamp) { - long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID; - - long seller; - // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio. - if (random.nextInt(config.configuration.hotSellersRatio) > 0) { - // Choose the first person in the batch of last HOT_SELLER_RATIO people. - seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO; - } else { - seller = nextBase0PersonId(random); - } - seller += GeneratorConfig.FIRST_PERSON_ID; - - long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES); - long initialBid = nextPrice(random); - long expires = timestamp + nextAuctionLengthMs(random, timestamp); - String name = nextString(random, 20); - String desc = nextString(random, 100); - long reserve = initialBid + nextPrice(random); - int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8; - String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize); - return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category, - extra); - } - - /** - * Generate and return a random bid with next available id. - */ - private Bid nextBid(Random random, long timestamp) { - long auction; - // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio. - if (random.nextInt(config.configuration.hotAuctionRatio) > 0) { - // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions. - auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO; - } else { - auction = nextBase0AuctionId(random); - } - auction += GeneratorConfig.FIRST_AUCTION_ID; - - long bidder; - // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio - if (random.nextInt(config.configuration.hotBiddersRatio) > 0) { - // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of - // last HOT_BIDDER_RATIO people. - bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1; - } else { - bidder = nextBase0PersonId(random); - } - bidder += GeneratorConfig.FIRST_PERSON_ID; - - long price = nextPrice(random); - int currentSize = 8 + 8 + 8 + 8; - String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize); - return new Bid(auction, bidder, price, timestamp, extra); - } - - @Override - public boolean hasNext() { - return numEvents < config.maxEvents; - } - - /** - * Return the next event. The outer timestamp is in wallclock time and corresponds to - * when the event should fire. The inner timestamp is in event-time and represents the - * time the event is purported to have taken place in the simulation. - */ - public NextEvent nextEvent() { - if (wallclockBaseTime < 0) { - wallclockBaseTime = System.currentTimeMillis(); - } - // When, in event time, we should generate the event. Monotonic. - long eventTimestamp = - config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey(); - // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize - // may have local jitter. - long adjustedEventTimestamp = - config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents)) - .getKey(); - // The minimum of this and all future adjusted event timestamps. Accounts for jitter in - // the event timestamp. - long watermark = - config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents)) - .getKey(); - // When, in wallclock time, we should emit the event. - long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime); - - // Seed the random number generator with the next 'event id'. - Random random = new Random(getNextEventId()); - long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR; - - Event event; - if (rem < GeneratorConfig.PERSON_PROPORTION) { - event = new Event(nextPerson(random, adjustedEventTimestamp)); - } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { - event = new Event(nextAuction(random, adjustedEventTimestamp)); - } else { - event = new Event(nextBid(random, adjustedEventTimestamp)); - } - - numEvents++; - return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark); - } - - @Override - public TimestampedValue<Event> next() { - NextEvent next = nextEvent(); - return TimestampedValue.of(next.event, new Instant(next.eventTimestamp)); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - /** - * Return how many microseconds till we emit the next event. - */ - public long currentInterEventDelayUs() { - return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)) - .getValue(); - } - - /** - * Return an estimate of fraction of output consumed. - */ - public double getFractionConsumed() { - return (double) numEvents / config.maxEvents; - } - - @Override - public String toString() { - return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config, - numEvents, wallclockBaseTime); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java deleted file mode 100644 index 95c276b..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.sources; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.values.KV; - -/** - * Parameters controlling how {@link Generator} synthesizes {@link Event} elements. - */ -public class GeneratorConfig implements Serializable { - - /** - * We start the ids at specific values to help ensure the queries find a match even on - * small synthesized dataset sizes. - */ - public static final long FIRST_AUCTION_ID = 1000L; - public static final long FIRST_PERSON_ID = 1000L; - public static final long FIRST_CATEGORY_ID = 10L; - - /** - * Proportions of people/auctions/bids to synthesize. - */ - public static final int PERSON_PROPORTION = 1; - public static final int AUCTION_PROPORTION = 3; - private static final int BID_PROPORTION = 46; - public static final int PROPORTION_DENOMINATOR = - PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION; - - /** - * Environment options. - */ - public final NexmarkConfiguration configuration; - - /** - * Delay between events, in microseconds. If the array has more than one entry then - * the rate is changed every {@link #stepLengthSec}, and wraps around. - */ - private final long[] interEventDelayUs; - - /** - * Delay before changing the current inter-event delay. - */ - private final long stepLengthSec; - - /** - * Time for first event (ms since epoch). - */ - public final long baseTime; - - /** - * Event id of first event to be generated. Event ids are unique over all generators, and - * are used as a seed to generate each event's data. - */ - public final long firstEventId; - - /** - * Maximum number of events to generate. - */ - public final long maxEvents; - - /** - * First event number. Generators running in parallel time may share the same event number, - * and the event number is used to determine the event timestamp. - */ - public final long firstEventNumber; - - /** - * True period of epoch in milliseconds. Derived from above. - * (Ie time to run through cycle for all interEventDelayUs entries). - */ - private final long epochPeriodMs; - - /** - * Number of events per epoch. Derived from above. - * (Ie number of events to run through cycle for all interEventDelayUs entries). - */ - private final long eventsPerEpoch; - - public GeneratorConfig( - NexmarkConfiguration configuration, long baseTime, long firstEventId, - long maxEventsOrZero, long firstEventNumber) { - this.configuration = configuration; - this.interEventDelayUs = configuration.rateShape.interEventDelayUs( - configuration.firstEventRate, configuration.nextEventRate, - configuration.rateUnit, configuration.numEventGenerators); - this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec); - this.baseTime = baseTime; - this.firstEventId = firstEventId; - if (maxEventsOrZero == 0) { - // Scale maximum down to avoid overflow in getEstimatedSizeBytes. - this.maxEvents = - Long.MAX_VALUE / (PROPORTION_DENOMINATOR - * Math.max( - Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize), - configuration.avgBidByteSize)); - } else { - this.maxEvents = maxEventsOrZero; - } - this.firstEventNumber = firstEventNumber; - - long eventsPerEpoch = 0; - long epochPeriodMs = 0; - if (interEventDelayUs.length > 1) { - for (long interEventDelayU : interEventDelayUs) { - long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU; - eventsPerEpoch += numEventsForThisCycle; - epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L; - } - } - this.eventsPerEpoch = eventsPerEpoch; - this.epochPeriodMs = epochPeriodMs; - } - - /** - * Return a copy of this config. - */ - public GeneratorConfig copy() { - GeneratorConfig result; - result = new GeneratorConfig(configuration, baseTime, firstEventId, - maxEvents, firstEventNumber); - return result; - } - - /** - * Split this config into {@code n} sub-configs with roughly equal number of - * possible events, but distinct value spaces. The generators will run on parallel timelines. - * This config should no longer be used. - */ - public List<GeneratorConfig> split(int n) { - List<GeneratorConfig> results = new ArrayList<>(); - if (n == 1) { - // No split required. - results.add(this); - } else { - long subMaxEvents = maxEvents / n; - long subFirstEventId = firstEventId; - for (int i = 0; i < n; i++) { - if (i == n - 1) { - // Don't loose any events to round-down. - subMaxEvents = maxEvents - subMaxEvents * (n - 1); - } - results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber)); - subFirstEventId += subMaxEvents; - } - } - return results; - } - - /** - * Return copy of this config except with given parameters. - */ - public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) { - return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber); - } - - /** - * Return an estimate of the bytes needed by {@code numEvents}. - */ - public long estimatedBytesForEvents(long numEvents) { - long numPersons = - (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR; - long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR; - long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR; - return numPersons * configuration.avgPersonByteSize - + numAuctions * configuration.avgAuctionByteSize - + numBids * configuration.avgBidByteSize; - } - - /** - * Return an estimate of the byte-size of all events a generator for this config would yield. - */ - public long getEstimatedSizeBytes() { - return estimatedBytesForEvents(maxEvents); - } - - /** - * Return the first 'event id' which could be generated from this config. Though events don't - * have ids we can simulate them to help bookkeeping. - */ - public long getStartEventId() { - return firstEventId + firstEventNumber; - } - - /** - * Return one past the last 'event id' which could be generated from this config. - */ - public long getStopEventId() { - return firstEventId + firstEventNumber + maxEvents; - } - - /** - * Return the next event number for a generator which has so far emitted {@code numEvents}. - */ - public long nextEventNumber(long numEvents) { - return firstEventNumber + numEvents; - } - - /** - * Return the next event number for a generator which has so far emitted {@code numEvents}, - * but adjusted to account for {@code outOfOrderGroupSize}. - */ - public long nextAdjustedEventNumber(long numEvents) { - long n = configuration.outOfOrderGroupSize; - long eventNumber = nextEventNumber(numEvents); - long base = (eventNumber / n) * n; - long offset = (eventNumber * 953) % n; - return base + offset; - } - - /** - * Return the event number who's event time will be a suitable watermark for - * a generator which has so far emitted {@code numEvents}. - */ - public long nextEventNumberForWatermark(long numEvents) { - long n = configuration.outOfOrderGroupSize; - long eventNumber = nextEventNumber(numEvents); - return (eventNumber / n) * n; - } - - /** - * What timestamp should the event with {@code eventNumber} have for this generator? And - * what inter-event delay (in microseconds) is current? - */ - public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) { - if (interEventDelayUs.length == 1) { - long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L; - return KV.of(timestamp, interEventDelayUs[0]); - } - - long epoch = eventNumber / eventsPerEpoch; - long n = eventNumber % eventsPerEpoch; - long offsetInEpochMs = 0; - for (long interEventDelayU : interEventDelayUs) { - long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU; - if (n < numEventsForThisCycle) { - long offsetInCycleUs = n * interEventDelayU; - long timestamp = - baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L); - return KV.of(timestamp, interEventDelayU); - } - n -= numEventsForThisCycle; - offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L; - } - throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("GeneratorConfig"); - sb.append("{configuration:"); - sb.append(configuration.toString()); - sb.append(";interEventDelayUs=["); - for (int i = 0; i < interEventDelayUs.length; i++) { - if (i > 0) { - sb.append(","); - } - sb.append(interEventDelayUs[i]); - } - sb.append("]"); - sb.append(";stepLengthSec:"); - sb.append(stepLengthSec); - sb.append(";baseTime:"); - sb.append(baseTime); - sb.append(";firstEventId:"); - sb.append(firstEventId); - sb.append(";maxEvents:"); - sb.append(maxEvents); - sb.append(";firstEventNumber:"); - sb.append(firstEventNumber); - sb.append(";epochPeriodMs:"); - sb.append(epochPeriodMs); - sb.append(";eventsPerEpoch:"); - sb.append(eventsPerEpoch); - sb.append("}"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java deleted file mode 100644 index 09d945d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.sources; - -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; - -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A custom, unbounded source of event records. - * - * <p>If {@code isRateLimited} is true, events become available for return from the reader such - * that the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise, - * events are returned every time the system asks for one. - */ -public class UnboundedEventSource extends UnboundedSource<Event, Generator.Checkpoint> { - private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30); - private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class); - - /** Configuration for generator to use when reading synthetic events. May be split. */ - private final GeneratorConfig config; - - /** How many unbounded sources to create. */ - private final int numEventGenerators; - - /** How many seconds to hold back the watermark. */ - private final long watermarkHoldbackSec; - - /** Are we rate limiting the events? */ - private final boolean isRateLimited; - - public UnboundedEventSource(GeneratorConfig config, int numEventGenerators, - long watermarkHoldbackSec, boolean isRateLimited) { - this.config = config; - this.numEventGenerators = numEventGenerators; - this.watermarkHoldbackSec = watermarkHoldbackSec; - this.isRateLimited = isRateLimited; - } - - /** A reader to pull events from the generator. */ - private class EventReader extends UnboundedReader<Event> { - /** Generator we are reading from. */ - private final Generator generator; - - /** - * Current watermark (ms since epoch). Initially set to beginning of time. - * Then updated to be the time of the next generated event. - * Then, once all events have been generated, set to the end of time. - */ - private long watermark; - - /** - * Current backlog (ms), as delay between timestamp of last returned event and the timestamp - * we should be up to according to wall-clock time. Used only for logging. - */ - private long backlogDurationMs; - - /** - * Current backlog, as estimated number of event bytes we are behind, or null if - * unknown. Reported to callers. - */ - @Nullable - private Long backlogBytes; - - /** - * Wallclock time (ms since epoch) we last reported the backlog, or -1 if never reported. - */ - private long lastReportedBacklogWallclock; - - /** - * Event time (ms since epoch) of pending event at last reported backlog, or -1 if never - * calculated. - */ - private long timestampAtLastReportedBacklogMs; - - /** Next event to make 'current' when wallclock time has advanced sufficiently. */ - @Nullable - private TimestampedValue<Event> pendingEvent; - - /** Wallclock time when {@link #pendingEvent} is due, or -1 if no pending event. */ - private long pendingEventWallclockTime; - - /** Current event to return from getCurrent. */ - @Nullable - private TimestampedValue<Event> currentEvent; - - /** Events which have been held back so as to force them to be late. */ - private final Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>(); - - public EventReader(Generator generator) { - this.generator = generator; - watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis(); - lastReportedBacklogWallclock = -1; - pendingEventWallclockTime = -1; - timestampAtLastReportedBacklogMs = -1; - } - - public EventReader(GeneratorConfig config) { - this(new Generator(config)); - } - - @Override - public boolean start() { - LOG.trace("starting unbounded generator {}", generator); - return advance(); - } - - - @Override - public boolean advance() { - long now = System.currentTimeMillis(); - - while (pendingEvent == null) { - if (!generator.hasNext() && heldBackEvents.isEmpty()) { - // No more events, EVER. - if (isRateLimited) { - updateBacklog(System.currentTimeMillis(), 0); - } - if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { - watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); - LOG.trace("stopped unbounded generator {}", generator); - } - return false; - } - - Generator.NextEvent next = heldBackEvents.peek(); - if (next != null && next.wallclockTimestamp <= now) { - // Time to use the held-back event. - heldBackEvents.poll(); - LOG.debug("replaying held-back event {}ms behind watermark", - watermark - next.eventTimestamp); - } else if (generator.hasNext()) { - next = generator.nextEvent(); - if (isRateLimited && config.configuration.probDelayedEvent > 0.0 - && config.configuration.occasionalDelaySec > 0 - && ThreadLocalRandom.current().nextDouble() < config.configuration.probDelayedEvent) { - // We'll hold back this event and go around again. - long delayMs = - ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 1000) - + 1L; - LOG.debug("delaying event by {}ms", delayMs); - heldBackEvents.add(next.withDelay(delayMs)); - continue; - } - } else { - // Waiting for held-back event to fire. - if (isRateLimited) { - updateBacklog(now, 0); - } - return false; - } - - pendingEventWallclockTime = next.wallclockTimestamp; - pendingEvent = TimestampedValue.of(next.event, new Instant(next.eventTimestamp)); - long newWatermark = - next.watermark - Duration.standardSeconds(watermarkHoldbackSec).getMillis(); - if (newWatermark > watermark) { - watermark = newWatermark; - } - } - - if (isRateLimited) { - if (pendingEventWallclockTime > now) { - // We want this event to fire in the future. Try again later. - updateBacklog(now, 0); - return false; - } - updateBacklog(now, now - pendingEventWallclockTime); - } - - // This event is ready to fire. - currentEvent = pendingEvent; - pendingEvent = null; - return true; - } - - private void updateBacklog(long now, long newBacklogDurationMs) { - backlogDurationMs = newBacklogDurationMs; - long interEventDelayUs = generator.currentInterEventDelayUs(); - if (interEventDelayUs != 0) { - long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 1) / interEventDelayUs; - backlogBytes = generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents); - } - if (lastReportedBacklogWallclock < 0 - || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) { - double timeDialation = Double.NaN; - if (pendingEvent != null - && lastReportedBacklogWallclock >= 0 - && timestampAtLastReportedBacklogMs >= 0) { - long wallclockProgressionMs = now - lastReportedBacklogWallclock; - long eventTimeProgressionMs = - pendingEvent.getTimestamp().getMillis() - timestampAtLastReportedBacklogMs; - timeDialation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs; - } - LOG.debug( - "unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay " - + "with {} time dilation", - backlogDurationMs, backlogBytes, interEventDelayUs, timeDialation); - lastReportedBacklogWallclock = now; - if (pendingEvent != null) { - timestampAtLastReportedBacklogMs = pendingEvent.getTimestamp().getMillis(); - } - } - } - - @Override - public Event getCurrent() { - if (currentEvent == null) { - throw new NoSuchElementException(); - } - return currentEvent.getValue(); - } - - @Override - public Instant getCurrentTimestamp() { - if (currentEvent == null) { - throw new NoSuchElementException(); - } - return currentEvent.getTimestamp(); - } - - @Override - public void close() { - // Nothing to close. - } - - @Override - public UnboundedEventSource getCurrentSource() { - return UnboundedEventSource.this; - } - - @Override - public Instant getWatermark() { - return new Instant(watermark); - } - - @Override - public Generator.Checkpoint getCheckpointMark() { - return generator.toCheckpoint(); - } - - @Override - public long getSplitBacklogBytes() { - return backlogBytes == null ? BACKLOG_UNKNOWN : backlogBytes; - } - - @Override - public String toString() { - return String.format("EventReader(%d, %d, %d)", - generator.getCurrentConfig().getStartEventId(), generator.getNextEventId(), - generator.getCurrentConfig().getStopEventId()); - } - } - - @Override - public Coder<Generator.Checkpoint> getCheckpointMarkCoder() { - return Generator.Checkpoint.CODER_INSTANCE; - } - - @Override - public List<UnboundedEventSource> split( - int desiredNumSplits, PipelineOptions options) { - LOG.trace("splitting unbounded source into {} sub-sources", numEventGenerators); - List<UnboundedEventSource> results = new ArrayList<>(); - // Ignore desiredNumSplits and use numEventGenerators instead. - for (GeneratorConfig subConfig : config.split(numEventGenerators)) { - results.add(new UnboundedEventSource(subConfig, 1, watermarkHoldbackSec, isRateLimited)); - } - return results; - } - - @Override - public EventReader createReader( - PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) { - if (checkpoint == null) { - LOG.trace("creating initial unbounded reader for {}", config); - return new EventReader(config); - } else { - LOG.trace("resuming unbounded reader from {}", checkpoint); - return new EventReader(checkpoint.toGenerator(config)); - } - } - - @Override - public void validate() { - // Nothing to validate. - } - - @Override - public Coder<Event> getDefaultOutputCoder() { - return Event.CODER; - } - - @Override - public String toString() { - return String.format( - "UnboundedEventSource(%d, %d)", config.getStartEventId(), config.getStopEventId()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/package-info.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/package-info.java deleted file mode 100644 index ceaec9d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Nexmark Synthetic Sources. - */ -package org.apache.beam.integration.nexmark.sources; http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/resources/log4j.properties b/integration/java/nexmark/src/main/resources/log4j.properties deleted file mode 100644 index 7dd57b5..0000000 --- a/integration/java/nexmark/src/main/resources/log4j.properties +++ /dev/null @@ -1,55 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the console -log4j.rootCategory=DEBUG, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n - -# General Beam loggers -log4j.logger.org.apache.beam.runners.direct=WARN -log4j.logger.org.apache.beam.sdk=WARN - -# Nexmark specific -log4j.logger.org.apache.beam.integration.nexmark=WARN - -# Settings to quiet third party logs that are too verbose -log4j.logger.org.spark_project.jetty=WARN -log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR - -# Setting to quiet spark logs, Beam logs should standout -log4j.logger.org.apache.beam.runners.spark=INFO -log4j.logger.org.apache.spark=WARN -log4j.logger.org.spark-project=WARN -log4j.logger.io.netty=INFO - -# Settings to quiet flink logs -log4j.logger.org.apache.flink=WARN - -# Settings to quiet apex logs -log4j.logger.org.apache.beam.runners.apex=INFO -log4j.logger.com.datatorrent=ERROR -log4j.logger.org.apache.hadoop.metrics2=WARN -log4j.logger.org.apache.commons=WARN -log4j.logger.org.apache.hadoop.security=WARN -log4j.logger.org.apache.hadoop.util=WARN - -# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support -log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL -log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java deleted file mode 100644 index 64a8e4f..0000000 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.UsesStatefulParDo; -import org.apache.beam.sdk.testing.UsesTimersInParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Test the various NEXMark queries yield results coherent with their models. */ -@RunWith(JUnit4.class) -public class QueryTest { - private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.copy(); - - static { - // careful, results of tests are linked to numEventGenerators because of timestamp generation - CONFIG.numEventGenerators = 1; - CONFIG.numEvents = 1000; - } - - @Rule public TestPipeline p = TestPipeline.create(); - - /** Test {@code query} matches {@code model}. */ - private void queryMatchesModel( - String name, NexmarkQuery query, NexmarkQueryModel model, boolean streamingMode) { - NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p); - PCollection<TimestampedValue<KnownSize>> results; - if (streamingMode) { - results = - p.apply(name + ".ReadUnBounded", NexmarkUtils.streamEventsSource(CONFIG)).apply(query); - } else { - results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query); - } - PAssert.that(results).satisfies(model.assertionFor()); - PipelineResult result = p.run(); - result.waitUntilFinish(); - } - - @Test - @Category(NeedsRunner.class) - public void query0MatchesModelBatch() { - queryMatchesModel("Query0TestBatch", new Query0(CONFIG), new Query0Model(CONFIG), false); - } - - @Test - @Category(NeedsRunner.class) - public void query0MatchesModelStreaming() { - queryMatchesModel("Query0TestStreaming", new Query0(CONFIG), new Query0Model(CONFIG), true); - } - - @Test - @Category(NeedsRunner.class) - public void query1MatchesModelBatch() { - queryMatchesModel("Query1TestBatch", new Query1(CONFIG), new Query1Model(CONFIG), false); - } - - @Test - @Category(NeedsRunner.class) - public void query1MatchesModelStreaming() { - queryMatchesModel("Query1TestStreaming", new Query1(CONFIG), new Query1Model(CONFIG), true); - } - - @Test - @Category(NeedsRunner.class) - public void query2MatchesModelBatch() { - queryMatchesModel("Query2TestBatch", new Query2(CONFIG), new Query2Model(CONFIG), false); - } - - @Test - @Category(NeedsRunner.class) - public void query2MatchesModelStreaming() { - queryMatchesModel("Query2TestStreaming", new Query2(CONFIG), new Query2Model(CONFIG), true); - } - - @Test - @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class}) - public void query3MatchesModelBatch() { - queryMatchesModel("Query3TestBatch", new Query3(CONFIG), new Query3Model(CONFIG), false); - } - - @Test - @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class}) - public void query3MatchesModelStreaming() { - queryMatchesModel("Query3TestStreaming", new Query3(CONFIG), new Query3Model(CONFIG), true); - } - - @Test - @Category(NeedsRunner.class) - public void query4MatchesModelBatch() { - queryMatchesModel("Query4TestBatch", new Query4(CONFIG), new Query4Model(CONFIG), false); - } - - @Test - @Category(NeedsRunner.class) - public void query4MatchesModelStreaming() { - queryMatchesModel("Query4TestStreaming", new Query4(CONFIG), new Query4Model(CONFIG), true); - } - - @Test - @Category(NeedsRunner.class) - public void query5MatchesModelBatch() { - queryMatchesModel("Query5TestBatch", new Query5(CONFIG), new Query5Model(CONFIG), false); - } - - @Test - @Category(NeedsRunner.class) - public void query5MatchesModelStreaming() { - queryMatchesModel("Query5TestStreaming", new Query5(CONFIG), new Query5Model(CONFIG), true); - } - - @Test - @Category(NeedsRunner.class) - public void query6MatchesModelBatch() { - queryMatchesModel("Query6TestBatch", new Query6(CONFIG), new Query6Model(CONFIG), false); - } - - @Test - @Category(NeedsRunner.class) - public void query6MatchesModelStreaming() { - queryMatchesModel("Query6TestStreaming", new Query6(CONFIG), new Query6Model(CONFIG), true); - } - - @Test - @Category(NeedsRunner.class) - public void query7MatchesModelBatch() { - queryMatchesModel("Query7TestBatch", new Query7(CONFIG), new Query7Model(CONFIG), false); - } - - @Test - @Category(NeedsRunner.class) - public void query7MatchesModelStreaming() { - queryMatchesModel("Query7TestStreaming", new Query7(CONFIG), new Query7Model(CONFIG), true); - } - - @Test - @Category(NeedsRunner.class) - public void query8MatchesModelBatch() { - queryMatchesModel("Query8TestBatch", new Query8(CONFIG), new Query8Model(CONFIG), false); - } - - @Test - @Category(NeedsRunner.class) - public void query8MatchesModelStreaming() { - queryMatchesModel("Query8TestStreaming", new Query8(CONFIG), new Query8Model(CONFIG), true); - } - - @Test - @Category(NeedsRunner.class) - public void query9MatchesModelBatch() { - queryMatchesModel("Query9TestBatch", new Query9(CONFIG), new Query9Model(CONFIG), false); - } - - @Test - @Category(NeedsRunner.class) - public void query9MatchesModelStreaming() { - queryMatchesModel("Query9TestStreaming", new Query9(CONFIG), new Query9Model(CONFIG), true); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java deleted file mode 100644 index d95461a..0000000 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.sources; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.SourceTestUtils; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test {@link BoundedEventSource}. - */ -@RunWith(JUnit4.class) -public class BoundedEventSourceTest { - private GeneratorConfig makeConfig(long n) { - return new GeneratorConfig( - NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0); - } - - @Test - public void sourceAndReadersWork() throws Exception { - NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class); - long n = 200L; - BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1); - - SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource( - source.createReader(options), options); - } - - @Test - public void splitAtFractionRespectsContract() throws Exception { - NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class); - long n = 20L; - BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1); - - // Can't split if already consumed. - SourceTestUtils.assertSplitAtFractionFails(source, 10, 0.3, options); - - SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(source, 5, 0.3, options); - - SourceTestUtils.assertSplitAtFractionExhaustive(source, options); - } - - @Test - public void splitIntoBundlesRespectsContract() throws Exception { - NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class); - long n = 200L; - BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1); - SourceTestUtils.assertSourcesEqualReferenceSource( - source, source.split(10, options), options); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/GeneratorTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/GeneratorTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/GeneratorTest.java deleted file mode 100644 index b0dff2f..0000000 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/GeneratorTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.sources; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test {@link Generator}. - */ -@RunWith(JUnit4.class) -public class GeneratorTest { - private GeneratorConfig makeConfig(long n) { - return new GeneratorConfig( - NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0); - } - - private <T> long consume(long n, Iterator<T> itr) { - for (long i = 0; i < n; i++) { - assertTrue(itr.hasNext()); - itr.next(); - } - return n; - } - - private <T> long consume(Iterator<T> itr) { - long n = 0; - while (itr.hasNext()) { - itr.next(); - n++; - } - return n; - } - - @Test - public void splitAtFractionPreservesOverallEventCount() { - long n = 55729L; - GeneratorConfig initialConfig = makeConfig(n); - long expected = initialConfig.getStopEventId() - initialConfig.getStartEventId(); - - long actual = 0; - - Generator initialGenerator = new Generator(initialConfig); - - // Consume some events. - actual += consume(5000, initialGenerator); - - - // Split once. - GeneratorConfig remainConfig1 = initialGenerator.splitAtEventId(9000L); - Generator remainGenerator1 = new Generator(remainConfig1); - - // Consume some more events. - actual += consume(2000, initialGenerator); - actual += consume(3000, remainGenerator1); - - // Split again. - GeneratorConfig remainConfig2 = remainGenerator1.splitAtEventId(30000L); - Generator remainGenerator2 = new Generator(remainConfig2); - - // Run to completion. - actual += consume(initialGenerator); - actual += consume(remainGenerator1); - actual += consume(remainGenerator2); - - assertEquals(expected, actual); - } - - @Test - public void splitPreservesOverallEventCount() { - long n = 51237L; - GeneratorConfig initialConfig = makeConfig(n); - long expected = initialConfig.getStopEventId() - initialConfig.getStartEventId(); - - List<Generator> generators = new ArrayList<>(); - for (GeneratorConfig subConfig : initialConfig.split(20)) { - generators.add(new Generator(subConfig)); - } - - long actual = 0; - for (Generator generator : generators) { - actual += consume(generator); - } - - assertEquals(expected, actual); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java deleted file mode 100644 index 1ecc33e..0000000 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.sources; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; -import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.testing.TestPipeline; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test UnboundedEventSource. - */ -@RunWith(JUnit4.class) -public class UnboundedEventSourceTest { - private GeneratorConfig makeConfig(long n) { - return new GeneratorConfig( - NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0); - } - - /** - * Helper for tracking which ids we've seen (so we can detect dups) and - * confirming reading events match the model events. - */ - private static class EventIdChecker { - private final Set<Long> seenPersonIds = new HashSet<>(); - private final Set<Long> seenAuctionIds = new HashSet<>(); - - public void add(Event event) { - if (event.newAuction != null) { - assertTrue(seenAuctionIds.add(event.newAuction.id)); - } else if (event.newPerson != null) { - assertTrue(seenPersonIds.add(event.newPerson.id)); - } - } - - public void add(int n, UnboundedReader<Event> reader, Generator modelGenerator) - throws IOException { - for (int i = 0; i < n; i++) { - assertTrue(modelGenerator.hasNext()); - Event modelEvent = modelGenerator.next().getValue(); - assertTrue(reader.advance()); - Event actualEvent = reader.getCurrent(); - assertEquals(modelEvent.toString(), actualEvent.toString()); - add(actualEvent); - } - } - } - - /** - * Check aggressively checkpointing and resuming a reader gives us exactly the - * same event stream as reading directly. - */ - @Test - public void resumeFromCheckpoint() throws IOException { - Random random = new Random(297); - int n = 47293; - GeneratorConfig config = makeConfig(n); - Generator modelGenerator = new Generator(config); - - EventIdChecker checker = new EventIdChecker(); - PipelineOptions options = TestPipeline.testingPipelineOptions(); - UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false); - UnboundedReader<Event> reader = source.createReader(options, null); - - while (n > 0) { - int m = Math.min(459 + random.nextInt(455), n); - System.out.printf("reading %d...%n", m); - checker.add(m, reader, modelGenerator); - n -= m; - System.out.printf("splitting with %d remaining...%n", n); - CheckpointMark checkpointMark = reader.getCheckpointMark(); - reader = source.createReader(options, (Generator.Checkpoint) checkpointMark); - } - - assertFalse(reader.advance()); - } -}
