Repository: beam Updated Branches: refs/heads/master f0ce31b9d -> 64ff21f35
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java new file mode 100644 index 0000000..4324b99 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarLongCoder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Result of {@link Query6}. + */ +public class SellerPrice implements KnownSize, Serializable { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + public static final Coder<SellerPrice> CODER = new AtomicCoder<SellerPrice>() { + @Override + public void encode(SellerPrice value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.seller, outStream, Context.NESTED); + LONG_CODER.encode(value.price, outStream, Context.NESTED); + } + + @Override + public SellerPrice decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long seller = LONG_CODER.decode(inStream, Context.NESTED); + long price = LONG_CODER.decode(inStream, Context.NESTED); + return new SellerPrice(seller, price); + } + }; + + @JsonProperty + public final long seller; + + /** Price in cents. */ + @JsonProperty + public final long price; + + // For Avro only. + @SuppressWarnings("unused") + private SellerPrice() { + seller = 0; + price = 0; + } + + public SellerPrice(long seller, long price) { + this.seller = seller; + this.price = price; + } + + @Override + public long sizeInBytes() { + return 8 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java new file mode 100644 index 0000000..2898251 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.TimestampedValue; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.ThreadLocalRandom; + +import javax.annotation.Nullable; + +/** + * A custom, unbounded source of event records. + * + * <p>If {@code isRateLimited} is true, events become available for return from the reader such + * that the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise, + * events are returned every time the system asks for one. + */ +class UnboundedEventSource extends UnboundedSource<Event, Generator.Checkpoint> { + private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30); + + /** Configuration for generator to use when reading synthetic events. May be split. */ + private final GeneratorConfig config; + + /** How many unbounded sources to create. */ + private final int numEventGenerators; + + /** How many seconds to hold back the watermark. */ + private final long watermarkHoldbackSec; + + /** Are we rate limiting the events? */ + private final boolean isRateLimited; + + public UnboundedEventSource(GeneratorConfig config, int numEventGenerators, + long watermarkHoldbackSec, boolean isRateLimited) { + this.config = config; + this.numEventGenerators = numEventGenerators; + this.watermarkHoldbackSec = watermarkHoldbackSec; + this.isRateLimited = isRateLimited; + } + + /** A reader to pull events from the generator. */ + private class EventReader extends UnboundedReader<Event> { + /** Generator we are reading from. */ + private final Generator generator; + + /** + * Current watermark (ms since epoch). Initially set to beginning of time. + * Then updated to be the time of the next generated event. + * Then, once all events have been generated, set to the end of time. + */ + private long watermark; + + /** + * Current backlog (ms), as delay between timestamp of last returned event and the timestamp + * we should be up to according to wall-clock time. Used only for logging. + */ + private long backlogDurationMs; + + /** + * Current backlog, as estimated number of event bytes we are behind, or null if + * unknown. Reported to callers. + */ + @Nullable + private Long backlogBytes; + + /** + * Wallclock time (ms since epoch) we last reported the backlog, or -1 if never reported. + */ + private long lastReportedBacklogWallclock; + + /** + * Event time (ms since epoch) of pending event at last reported backlog, or -1 if never + * calculated. + */ + private long timestampAtLastReportedBacklogMs; + + /** Next event to make 'current' when wallclock time has advanced sufficiently. */ + @Nullable + private TimestampedValue<Event> pendingEvent; + + /** Wallclock time when {@link #pendingEvent} is due, or -1 if no pending event. */ + private long pendingEventWallclockTime; + + /** Current event to return from getCurrent. */ + @Nullable + private TimestampedValue<Event> currentEvent; + + /** Events which have been held back so as to force them to be late. */ + private Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>(); + + public EventReader(Generator generator) { + this.generator = generator; + watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis(); + lastReportedBacklogWallclock = -1; + pendingEventWallclockTime = -1; + timestampAtLastReportedBacklogMs = -1; + } + + public EventReader(GeneratorConfig config) { + this(new Generator(config)); + } + + @Override + public boolean start() { + NexmarkUtils.error("starting unbounded generator %s", generator); + return advance(); + } + + + @Override + public boolean advance() { + long now = System.currentTimeMillis(); + + while (pendingEvent == null) { + if (!generator.hasNext() && heldBackEvents.isEmpty()) { + // No more events, EVER. + if (isRateLimited) { + updateBacklog(System.currentTimeMillis(), 0); + } + if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + NexmarkUtils.error("stopped unbounded generator %s", generator); + } + return false; + } + + Generator.NextEvent next = heldBackEvents.peek(); + if (next != null && next.wallclockTimestamp <= now) { + // Time to use the held-back event. + heldBackEvents.poll(); + NexmarkUtils.error("replaying held-back event %dms behind watermark", + watermark - next.eventTimestamp); + } else if (generator.hasNext()) { + next = generator.nextEvent(); + if (isRateLimited && config.configuration.probDelayedEvent > 0.0 + && config.configuration.occasionalDelaySec > 0 + && ThreadLocalRandom.current().nextDouble() < config.configuration.probDelayedEvent) { + // We'll hold back this event and go around again. + long delayMs = + ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 1000) + + 1L; + NexmarkUtils.error("delaying event by %dms", delayMs); + heldBackEvents.add(next.withDelay(delayMs)); + continue; + } + } else { + // Waiting for held-back event to fire. + if (isRateLimited) { + updateBacklog(now, 0); + } + return false; + } + + pendingEventWallclockTime = next.wallclockTimestamp; + pendingEvent = TimestampedValue.of(next.event, new Instant(next.eventTimestamp)); + long newWatermark = + next.watermark - Duration.standardSeconds(watermarkHoldbackSec).getMillis(); + if (newWatermark > watermark) { + watermark = newWatermark; + } + } + + if (isRateLimited) { + if (pendingEventWallclockTime > now) { + // We want this event to fire in the future. Try again later. + updateBacklog(now, 0); + return false; + } + updateBacklog(now, now - pendingEventWallclockTime); + } + + // This event is ready to fire. + currentEvent = pendingEvent; + pendingEvent = null; + return true; + } + + private void updateBacklog(long now, long newBacklogDurationMs) { + backlogDurationMs = newBacklogDurationMs; + long interEventDelayUs = generator.currentInterEventDelayUs(); + if (interEventDelayUs != 0) { + long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 1) / interEventDelayUs; + backlogBytes = generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents); + } + if (lastReportedBacklogWallclock < 0 + || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) { + double timeDialation = Double.NaN; + if (pendingEvent != null + && lastReportedBacklogWallclock >= 0 + && timestampAtLastReportedBacklogMs >= 0) { + long wallclockProgressionMs = now - lastReportedBacklogWallclock; + long eventTimeProgressionMs = + pendingEvent.getTimestamp().getMillis() - timestampAtLastReportedBacklogMs; + timeDialation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs; + } + NexmarkUtils.error( + "unbounded generator backlog now %dms (%s bytes) at %dus interEventDelay " + + "with %f time dilation", + backlogDurationMs, backlogBytes, interEventDelayUs, timeDialation); + lastReportedBacklogWallclock = now; + if (pendingEvent != null) { + timestampAtLastReportedBacklogMs = pendingEvent.getTimestamp().getMillis(); + } + } + } + + @Override + public Event getCurrent() { + if (currentEvent == null) { + throw new NoSuchElementException(); + } + return currentEvent.getValue(); + } + + @Override + public Instant getCurrentTimestamp() { + if (currentEvent == null) { + throw new NoSuchElementException(); + } + return currentEvent.getTimestamp(); + } + + @Override + public void close() { + // Nothing to close. + } + + @Override + public UnboundedEventSource getCurrentSource() { + return UnboundedEventSource.this; + } + + @Override + public Instant getWatermark() { + return new Instant(watermark); + } + + @Override + public Generator.Checkpoint getCheckpointMark() { + return generator.toCheckpoint(); + } + + @Override + public long getSplitBacklogBytes() { + return backlogBytes == null ? BACKLOG_UNKNOWN : backlogBytes; + } + + @Override + public String toString() { + return String.format("EventReader(%d, %d, %d)", + generator.getCurrentConfig().getStartEventId(), generator.getNextEventId(), + generator.getCurrentConfig().getStopEventId()); + } + } + + @Override + public Coder<Generator.Checkpoint> getCheckpointMarkCoder() { + return Generator.Checkpoint.CODER_INSTANCE; + } + + @Override + public List<UnboundedEventSource> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) { + NexmarkUtils.error( + "splitting unbounded source %s into %d sub-sources", config, numEventGenerators); + List<UnboundedEventSource> results = new ArrayList<>(); + // Ignore desiredNumSplits and use numEventGenerators instead. + for (GeneratorConfig subConfig : config.split(numEventGenerators)) { + results.add(new UnboundedEventSource(subConfig, 1, watermarkHoldbackSec, isRateLimited)); + } + return results; + } + + @Override + public EventReader createReader( + PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) { + if (checkpoint == null) { + NexmarkUtils.error("creating initial unbounded reader for %s", config); + return new EventReader(config); + } else { + NexmarkUtils.error("resuming unbounded reader from %s", checkpoint); + return new EventReader(checkpoint.toGenerator(config)); + } + } + + @Override + public void validate() { + // Nothing to validate. + } + + @Override + public Coder<Event> getDefaultOutputCoder() { + return Event.CODER; + } + + @Override + public String toString() { + return String.format( + "UnboundedEventSource(%d, %d)", config.getStartEventId(), config.getStopEventId()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java new file mode 100644 index 0000000..16f901c --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum.SumLongFn; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import com.google.common.base.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import org.joda.time.Instant; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * A transform to find the winning bid for each closed auction. In pseudo CQL syntax: + * + * <pre> + * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime) + * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] + * WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME + * GROUP BY A.id + * </pre> + * + * <p>We will also check that the winning bid is above the auction reserve. Note that + * we ignore the auction opening bid value since it has no impact on which bid eventually wins, + * if any. + * + * <p>Our implementation will use a custom windowing function in order to bring bids and + * auctions together without requiring global state. + */ +public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> { + /** Windows for open auctions and bids. */ + private static class AuctionOrBidWindow extends IntervalWindow implements Serializable { + /** Id of auction this window is for. */ + public final long auction; + + /** + * True if this window represents an actual auction, and thus has a start/end + * time matching that of the auction. False if this window represents a bid, and + * thus has an unbounded start/end time. + */ + public final boolean isAuctionWindow; + + /** For avro only. */ + private AuctionOrBidWindow() { + super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE); + auction = 0; + isAuctionWindow = false; + } + + private AuctionOrBidWindow( + Instant start, Instant end, long auctionId, boolean isAuctionWindow) { + super(start, end); + this.auction = auctionId; + this.isAuctionWindow = isAuctionWindow; + } + + /** Return an auction window for {@code auction}. */ + public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) { + AuctionOrBidWindow result = + new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true); + return result; + } + + /** + * Return a bid window for {@code bid}. It should later be merged into + * the corresponding auction window. However, it is possible this bid is for an already + * expired auction, or for an auction which the system has not yet seen. So we + * give the bid a bit of wiggle room in its interval. + */ + public static AuctionOrBidWindow forBid( + long expectedAuctionDurationMs, Instant timestamp, Bid bid) { + // At this point we don't know which auctions are still valid, and the bid may + // be for an auction which won't start until some unknown time in the future + // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid). + // A real system would atomically reconcile bids and auctions by a separate mechanism. + // If we give bids an unbounded window it is possible a bid for an auction which + // has already expired would cause the system watermark to stall, since that window + // would never be retired. + // Instead, we will just give the bid a finite window which expires at + // the upper bound of auctions assuming the auction starts at the same time as the bid, + // and assuming the system is running at its lowest event rate (as per interEventDelayUs). + AuctionOrBidWindow result = new AuctionOrBidWindow( + timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false); + return result; + } + + /** Is this an auction window? */ + public boolean isAuctionWindow() { + return isAuctionWindow; + } + + @Override + public String toString() { + return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}", + start(), end(), auction, isAuctionWindow); + } + } + + /** + * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long. + */ + private static class AuctionOrBidWindowCoder extends AtomicCoder<AuctionOrBidWindow> { + private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder(); + private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder(); + private static final Coder<Long> ID_CODER = VarLongCoder.of(); + private static final Coder<Integer> INT_CODER = VarIntCoder.of(); + + @JsonCreator + public static AuctionOrBidWindowCoder of() { + return INSTANCE; + } + + @Override + public void encode(AuctionOrBidWindow window, OutputStream outStream, Context context) + throws IOException, CoderException { + SUPER_CODER.encode(window, outStream, Context.NESTED); + ID_CODER.encode(window.auction, outStream, Context.NESTED); + INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Context.NESTED); + } + + @Override + public AuctionOrBidWindow decode(InputStream inStream, Context context) + throws IOException, CoderException { + IntervalWindow superWindow = SUPER_CODER.decode(inStream, Context.NESTED); + long auction = ID_CODER.decode(inStream, Context.NESTED); + boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) == 0 ? false : true; + return new AuctionOrBidWindow( + superWindow.start(), superWindow.end(), auction, isAuctionWindow); + } + } + + /** Assign events to auction windows and merges them intelligently. */ + private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> { + /** Expected duration of auctions in ms. */ + private final long expectedAuctionDurationMs; + + public AuctionOrBidWindowFn(long expectedAuctionDurationMs) { + this.expectedAuctionDurationMs = expectedAuctionDurationMs; + } + + @Override + public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) { + Event event = c.element(); + if (event.newAuction != null) { + // Assign auctions to an auction window which expires at the auction's close. + return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction)); + } else if (event.bid != null) { + // Assign bids to a temporary bid window which will later be merged into the appropriate + // auction window. + return Arrays.asList( + AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid)); + } else { + // Don't assign people to any window. They will thus be dropped. + return Arrays.asList(); + } + } + + @Override + public void mergeWindows(MergeContext c) throws Exception { + // Split and index the auction and bid windows by auction id. + Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>(); + Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>(); + for (AuctionOrBidWindow window : c.windows()) { + if (window.isAuctionWindow()) { + idToTrueAuctionWindow.put(window.auction, window); + } else { + List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction); + if (bidWindows == null) { + bidWindows = new ArrayList<>(); + idToBidAuctionWindows.put(window.auction, bidWindows); + } + bidWindows.add(window); + } + } + + // Merge all 'bid' windows into their corresponding 'auction' window, provided the + // auction has not expired. + for (long auction : idToTrueAuctionWindow.keySet()) { + AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction); + List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction); + if (bidWindows != null) { + List<AuctionOrBidWindow> toBeMerged = new ArrayList<>(); + for (AuctionOrBidWindow bidWindow : bidWindows) { + if (bidWindow.start().isBefore(auctionWindow.end())) { + toBeMerged.add(bidWindow); + } + // else: This bid window will remain until its expire time, at which point it + // will expire without ever contributing to an output. + } + if (!toBeMerged.isEmpty()) { + toBeMerged.add(auctionWindow); + c.merge(toBeMerged, auctionWindow); + } + } + } + } + + @Override + public boolean isCompatible(WindowFn<?, ?> other) { + return other instanceof AuctionOrBidWindowFn; + } + + @Override + public Coder<AuctionOrBidWindow> windowCoder() { + return AuctionOrBidWindowCoder.of(); + } + + @Override + public AuctionOrBidWindow getSideInputWindow(BoundedWindow window) { + throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs"); + } + + /** + * Below we will GBK auctions and bids on their auction ids. Then we will reduce those + * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at + * least one valid bid. We would like those output pairs to have a timestamp of the auction's + * expiry (since that's the earliest we know for sure we have the correct winner). We would + * also like to make that winning results are available to following stages at the auction's + * expiry. + * + * <p> + * Each result of the GBK will have a timestamp of the min of the result of this object's + * assignOutputTime over all records which end up in one of its iterables. Thus we get the + * desired behavior if we ignore each record's timestamp and always return the auction window's + * 'maxTimestamp', which will correspond to the auction's expiry. + * + * <p> + * In contrast, if this object's assignOutputTime were to return 'inputTimestamp' + * (the usual implementation), then each GBK record will take as its timestamp the minimum of + * the timestamps of all bids and auctions within it, which will always be the auction's + * timestamp. An auction which expires well into the future would thus hold up the watermark + * of the GBK results until that auction expired. That in turn would hold up all winning pairs. + */ + @Override + public Instant getOutputTime( + Instant inputTimestamp, AuctionOrBidWindow window) { + return window.maxTimestamp(); + } + } + + private final AuctionOrBidWindowFn auctionOrBidWindowFn; + + public WinningBids(String name, NexmarkConfiguration configuration) { + super(name); + // What's the expected auction time (when the system is running at the lowest event rate). + long[] interEventDelayUs = configuration.rateShape.interEventDelayUs( + configuration.firstEventRate, configuration.nextEventRate, + configuration.rateUnit, configuration.numEventGenerators); + long longestDelayUs = 0; + for (int i = 0; i < interEventDelayUs.length; i++) { + longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]); + } + // Adjust for proportion of auction events amongst all events. + longestDelayUs = + (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR) + / GeneratorConfig.AUCTION_PROPORTION; + // Adjust for number of in-flight auctions. + longestDelayUs = longestDelayUs * configuration.numInFlightAuctions; + long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000; + NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs); + auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs); + } + + @Override + public PCollection<AuctionBid> apply(PCollection<Event> events) { + // Window auctions and bids into custom auction windows. New people events will be discarded. + // This will allow us to bring bids and auctions together irrespective of how long + // each auction is open for. + events = events.apply(Window.named("Window").into(auctionOrBidWindowFn)); + + // Key auctions by their id. + PCollection<KV<Long, Auction>> auctionsById = + events.apply(NexmarkQuery.JUST_NEW_AUCTIONS).apply(NexmarkQuery.AUCTION_BY_ID); + + // Key bids by their auction id. + PCollection<KV<Long, Bid>> bidsByAuctionId = + events.apply(NexmarkQuery.JUST_BIDS).apply(NexmarkQuery.BID_BY_AUCTION); + + // Find the highest price valid bid for each closed auction. + return + // Join auctions and bids. + KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById) + .and(NexmarkQuery.BID_TAG, bidsByAuctionId) + .apply(CoGroupByKey.<Long>create()) + + // Filter and select. + .apply( + ParDo.named(name + ".Join") + .of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() { + final Aggregator<Long, Long> noAuctionCounter = + createAggregator("noAuction", new SumLongFn()); + final Aggregator<Long, Long> underReserveCounter = + createAggregator("underReserve", new SumLongFn()); + final Aggregator<Long, Long> noValidBidsCounter = + createAggregator("noValidBids", new SumLongFn()); + + + @Override + public void processElement(ProcessContext c) { + Auction auction = + c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null); + if (auction == null) { + // We have bids without a matching auction. Give up. + noAuctionCounter.addValue(1L); + return; + } + // Find the current winning bid for auction. + // The earliest bid with the maximum price above the reserve wins. + Bid bestBid = null; + for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) { + // Bids too late for their auction will have been + // filtered out by the window merge function. + Preconditions.checkState(bid.dateTime < auction.expires); + if (bid.price < auction.reserve) { + // Bid price is below auction reserve. + underReserveCounter.addValue(1L); + continue; + } + + if (bestBid == null + || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) { + bestBid = bid; + } + } + if (bestBid == null) { + // We don't have any valid bids for auction. + noValidBidsCounter.addValue(1L); + return; + } + c.output(new AuctionBid(auction, bestBid)); + } + })); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java new file mode 100644 index 0000000..b61aed1 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.TimestampedValue; + +import org.joda.time.Instant; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import javax.annotation.Nullable; + +/** + * A simulator of the {@code WinningBids} query. + */ +public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> { + /** Auctions currently still open, indexed by auction id. */ + private final Map<Long, Auction> openAuctions; + + /** The ids of auctions known to be closed. */ + private final Set<Long> closedAuctions; + + /** Current best valid bids for open auctions, indexed by auction id. */ + private final Map<Long, Bid> bestBids; + + /** Bids for auctions we havn't seen yet. */ + private final List<Bid> bidsWithoutAuctions; + + /** + * Timestamp of last new auction or bid event (ms since epoch). + */ + private long lastTimestamp; + + public WinningBidsSimulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + openAuctions = new TreeMap<>(); + closedAuctions = new TreeSet<>(); + bestBids = new TreeMap<>(); + bidsWithoutAuctions = new ArrayList<>(); + lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + } + + /** + * Try to account for {@code bid} in state. Return true if bid has now been + * accounted for by {@code bestBids}. + */ + private boolean captureBestBid(Bid bid, boolean shouldLog) { + if (closedAuctions.contains(bid.auction)) { + // Ignore bids for known, closed auctions. + if (shouldLog) { + NexmarkUtils.info("closed auction: %s", bid); + } + return true; + } + Auction auction = openAuctions.get(bid.auction); + if (auction == null) { + // We don't have an auction for this bid yet, so can't determine if it is + // winning or not. + if (shouldLog) { + NexmarkUtils.info("pending auction: %s", bid); + } + return false; + } + if (bid.price < auction.reserve) { + // Bid price is too low. + if (shouldLog) { + NexmarkUtils.info("below reserve: %s", bid); + } + return true; + } + Bid existingBid = bestBids.get(bid.auction); + if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) { + // We've found a (new) best bid for a known auction. + bestBids.put(bid.auction, bid); + if (shouldLog) { + NexmarkUtils.info("new winning bid: %s", bid); + } + } else { + if (shouldLog) { + NexmarkUtils.info("ignoring low bid: %s", bid); + } + } + return true; + } + + /** + * Try to match bids without auctions to auctions. + */ + private void flushBidsWithoutAuctions() { + Iterator<Bid> itr = bidsWithoutAuctions.iterator(); + while (itr.hasNext()) { + Bid bid = itr.next(); + if (captureBestBid(bid, false)) { + NexmarkUtils.info("bid now accounted for: %s", bid); + itr.remove(); + } + } + } + + /** + * Return the next winning bid for an expired auction relative to {@code timestamp}. + * Return null if no more winning bids, in which case all expired auctions will + * have been removed from our state. Retire auctions in order of expire time. + */ + @Nullable + private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) { + Map<Long, List<Long>> toBeRetired = new TreeMap<>(); + for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) { + if (entry.getValue().expires <= timestamp) { + List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires); + if (idsAtTime == null) { + idsAtTime = new ArrayList<>(); + toBeRetired.put(entry.getValue().expires, idsAtTime); + } + idsAtTime.add(entry.getKey()); + } + } + for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) { + for (long id : entry.getValue()) { + Auction auction = openAuctions.get(id); + NexmarkUtils.info("retiring auction: %s", auction); + openAuctions.remove(id); + Bid bestBid = bestBids.get(id); + if (bestBid != null) { + TimestampedValue<AuctionBid> result = + TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires)); + NexmarkUtils.info("winning: %s", result); + return result; + } + } + } + return null; + } + + @Override + protected void run() { + if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) { + // We may have finally seen the auction a bid was intended for. + flushBidsWithoutAuctions(); + TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp); + if (result != null) { + addResult(result); + return; + } + } + + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + // No more events. Flush any still open auctions. + TimestampedValue<AuctionBid> result = + nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + if (result == null) { + // We are done. + allDone(); + return; + } + addResult(result); + return; + } + + Event event = timestampedEvent.getValue(); + if (event.newPerson != null) { + // Ignore new person events. + return; + } + + lastTimestamp = timestampedEvent.getTimestamp().getMillis(); + if (event.newAuction != null) { + // Add this new open auction to our state. + openAuctions.put(event.newAuction.id, event.newAuction); + } else { + if (!captureBestBid(event.bid, true)) { + // We don't know what to do with this bid yet. + NexmarkUtils.info("bid not yet accounted for: %s", event.bid); + bidsWithoutAuctions.add(event.bid); + } + } + // Keep looking for winning bids. + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/test/java/org/apache/beam/integration/nexmark/BoundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/integration/java/src/test/java/org/apache/beam/integration/nexmark/BoundedEventSourceTest.java b/integration/java/src/test/java/org/apache/beam/integration/nexmark/BoundedEventSourceTest.java new file mode 100644 index 0000000..f017267 --- /dev/null +++ b/integration/java/src/test/java/org/apache/beam/integration/nexmark/BoundedEventSourceTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.SourceTestUtils; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test {@link BoundedEventSource}. + */ +@RunWith(JUnit4.class) +public class BoundedEventSourceTest { + private GeneratorConfig makeConfig(long n) { + return new GeneratorConfig( + NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0); + } + + @Test + public void sourceAndReadersWork() throws Exception { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + long n = 200L; + BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1); + + SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource( + source.createReader(options), options); + } + + @Test + public void splitAtFractionRespectsContract() throws Exception { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + long n = 20L; + BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1); + + // Can't split if already consumed. + SourceTestUtils.assertSplitAtFractionFails(source, 10, 0.3, options); + + SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(source, 5, 0.3, options); + + SourceTestUtils.assertSplitAtFractionExhaustive(source, options); + } + + @Test + public void splitIntoBundlesRespectsContract() throws Exception { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + long n = 200L; + BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1); + SourceTestUtils.assertSourcesEqualReferenceSource( + source, source.splitIntoBundles(10, options), options); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/test/java/org/apache/beam/integration/nexmark/GeneratorTest.java ---------------------------------------------------------------------- diff --git a/integration/java/src/test/java/org/apache/beam/integration/nexmark/GeneratorTest.java b/integration/java/src/test/java/org/apache/beam/integration/nexmark/GeneratorTest.java new file mode 100644 index 0000000..bbaee26 --- /dev/null +++ b/integration/java/src/test/java/org/apache/beam/integration/nexmark/GeneratorTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Test {@link Generator}. + */ +@RunWith(JUnit4.class) +public class GeneratorTest { + private GeneratorConfig makeConfig(long n) { + return new GeneratorConfig( + NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0); + } + + private <T> long consume(long n, Iterator<T> itr) { + for (long i = 0; i < n; i++) { + assertTrue(itr.hasNext()); + itr.next(); + } + return n; + } + + private <T> long consume(Iterator<T> itr) { + long n = 0; + while (itr.hasNext()) { + itr.next(); + n++; + } + return n; + } + + @Test + public void splitAtFractionPreservesOverallEventCount() { + long n = 55729L; + GeneratorConfig initialConfig = makeConfig(n); + long expected = initialConfig.getStopEventId() - initialConfig.getStartEventId(); + + long actual = 0; + + Generator initialGenerator = new Generator(initialConfig); + + // Consume some events. + actual += consume(5000, initialGenerator); + + + // Split once. + GeneratorConfig remainConfig1 = initialGenerator.splitAtEventId(9000L); + Generator remainGenerator1 = new Generator(remainConfig1); + + // Consume some more events. + actual += consume(2000, initialGenerator); + actual += consume(3000, remainGenerator1); + + // Split again. + GeneratorConfig remainConfig2 = remainGenerator1.splitAtEventId(30000L); + Generator remainGenerator2 = new Generator(remainConfig2); + + // Run to completion. + actual += consume(initialGenerator); + actual += consume(remainGenerator1); + actual += consume(remainGenerator2); + + assertEquals(expected, actual); + } + + @Test + public void splitPreservesOverallEventCount() { + long n = 51237L; + GeneratorConfig initialConfig = makeConfig(n); + long expected = initialConfig.getStopEventId() - initialConfig.getStartEventId(); + + List<Generator> generators = new ArrayList<>(); + for (GeneratorConfig subConfig : initialConfig.split(20)) { + generators.add(new Generator(subConfig)); + } + + long actual = 0; + for (Generator generator : generators) { + actual += consume(generator); + } + + assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java ---------------------------------------------------------------------- diff --git a/integration/java/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java b/integration/java/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java new file mode 100644 index 0000000..860fa78 --- /dev/null +++ b/integration/java/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.TimestampedValue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test the various NEXMark queries yield results coherent with their models. + */ +@RunWith(JUnit4.class) +public class QueryTest { + private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.clone(); + + static { + CONFIG.numEvents = 2000; + } + + /** Test {@code query} matches {@code model}. */ + private static void queryMatchesModel(String name, NexmarkQuery query, NexmarkQueryModel model) { + Pipeline p = TestPipeline.create(); + NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p); + PCollection<TimestampedValue<KnownSize>> results = + p.apply(NexmarkUtils.batchEventsSource(name, CONFIG)).apply(query); + results.setIsBoundedInternal(IsBounded.BOUNDED); + PAssert.that(results).satisfies(model.assertionFor()); + p.run(); + } + + @Test + public void query0MatchesModel() { + queryMatchesModel("Query0Test", new Query0(CONFIG), new Query0Model(CONFIG)); + } + + @Test + public void query1MatchesModel() { + queryMatchesModel("Query1Test", new Query1(CONFIG), new Query1Model(CONFIG)); + } + + @Test + public void query2MatchesModel() { + queryMatchesModel("Query2Test", new Query2(CONFIG), new Query2Model(CONFIG)); + } + + @Test + public void query3MatchesModel() { + queryMatchesModel("Query3Test", new Query3(CONFIG), new Query3Model(CONFIG)); + } + + @Test + public void query4MatchesModel() { + queryMatchesModel("Query4Test", new Query4(CONFIG), new Query4Model(CONFIG)); + } + + @Test + public void query5MatchesModel() { + queryMatchesModel("Query5Test", new Query5(CONFIG), new Query5Model(CONFIG)); + } + + @Test + public void query6MatchesModel() { + queryMatchesModel("Query6Test", new Query6(CONFIG), new Query6Model(CONFIG)); + } + + @Test + public void query7MatchesModel() { + queryMatchesModel("Query7Test", new Query7(CONFIG), new Query7Model(CONFIG)); + } + + @Test + public void query8MatchesModel() { + queryMatchesModel("Query8Test", new Query8(CONFIG), new Query8Model(CONFIG)); + } + + @Test + public void query9MatchesModel() { + queryMatchesModel("Query9Test", new Query9(CONFIG), new Query9Model(CONFIG)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/integration/java/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java b/integration/java/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java new file mode 100644 index 0000000..5d72f77 --- /dev/null +++ b/integration/java/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.TestPipeline; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +/** + * Test UnboundedEventSource. + */ +@RunWith(JUnit4.class) +public class UnboundedEventSourceTest { + private GeneratorConfig makeConfig(long n) { + return new GeneratorConfig( + NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0); + } + + /** + * Helper for tracking which ids we've seen (so we can detect dups) and + * confirming reading events match the model events. + */ + private static class EventIdChecker { + private Set<Long> seenPersonIds = new HashSet<>(); + private Set<Long> seenAuctionIds = new HashSet<>(); + + public void add(Event event) { + if (event.newAuction != null) { + assertTrue(seenAuctionIds.add(event.newAuction.id)); + } else if (event.newPerson != null) { + assertTrue(seenPersonIds.add(event.newPerson.id)); + } + } + + public void add(int n, UnboundedReader<Event> reader, Generator modelGenerator) + throws IOException { + for (int i = 0; i < n; i++) { + assertTrue(modelGenerator.hasNext()); + Event modelEvent = modelGenerator.next().getValue(); + assertTrue(reader.advance()); + Event actualEvent = reader.getCurrent(); + assertEquals(modelEvent.toString(), actualEvent.toString()); + add(actualEvent); + } + } + } + + /** + * Check aggressively checkpointing and resuming a reader gives us exactly the + * same event stream as reading directly. + */ + @Test + public void resumeFromCheckpoint() throws IOException { + Random random = new Random(297); + int n = 47293; + GeneratorConfig config = makeConfig(n); + Generator modelGenerator = new Generator(config); + + EventIdChecker checker = new EventIdChecker(); + Pipeline p = TestPipeline.create(); + PipelineOptions options = p.getOptions(); + UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false); + UnboundedReader<Event> reader = source.createReader(options, null); + + while (n > 0) { + int m = Math.min(459 + random.nextInt(455), n); + System.out.printf("reading %d...\n", m); + checker.add(m, reader, modelGenerator); + n -= m; + System.out.printf("splitting with %d remaining...\n", n); + CheckpointMark checkpointMark = reader.getCheckpointMark(); + assertTrue(checkpointMark instanceof Generator.Checkpoint); + reader = source.createReader(options, (Generator.Checkpoint) checkpointMark); + } + + assertFalse(reader.advance()); + } +}
