Repository: beam
Updated Branches:
  refs/heads/master f0ce31b9d -> 64ff21f35


http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java
----------------------------------------------------------------------
diff --git 
a/integration/java/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java
 
b/integration/java/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java
new file mode 100644
index 0000000..4324b99
--- /dev/null
+++ 
b/integration/java/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link Query6}.
+ */
+public class SellerPrice implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<SellerPrice> CODER = new 
AtomicCoder<SellerPrice>() {
+    @Override
+    public void encode(SellerPrice value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.seller, outStream, Context.NESTED);
+      LONG_CODER.encode(value.price, outStream, Context.NESTED);
+    }
+
+    @Override
+    public SellerPrice decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long seller = LONG_CODER.decode(inStream, Context.NESTED);
+      long price = LONG_CODER.decode(inStream, Context.NESTED);
+      return new SellerPrice(seller, price);
+    }
+  };
+
+  @JsonProperty
+  public final long seller;
+
+  /** Price in cents. */
+  @JsonProperty
+  public final long price;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private SellerPrice() {
+    seller = 0;
+    price = 0;
+  }
+
+  public SellerPrice(long seller, long price) {
+    this.seller = seller;
+    this.price = price;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git 
a/integration/java/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java
 
b/integration/java/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java
new file mode 100644
index 0000000..2898251
--- /dev/null
+++ 
b/integration/java/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.annotation.Nullable;
+
+/**
+ * A custom, unbounded source of event records.
+ *
+ * <p>If {@code isRateLimited} is true, events become available for return 
from the reader such
+ * that the overall rate respect the {@code interEventDelayUs} period if 
possible. Otherwise,
+ * events are returned every time the system asks for one.
+ */
+class UnboundedEventSource extends UnboundedSource<Event, 
Generator.Checkpoint> {
+  private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30);
+
+  /** Configuration for generator to use when reading synthetic events. May be 
split. */
+  private final GeneratorConfig config;
+
+  /** How many unbounded sources to create. */
+  private final int numEventGenerators;
+
+  /** How many seconds to hold back the watermark. */
+  private final long watermarkHoldbackSec;
+
+  /** Are we rate limiting the events? */
+  private final boolean isRateLimited;
+
+  public UnboundedEventSource(GeneratorConfig config, int numEventGenerators,
+      long watermarkHoldbackSec, boolean isRateLimited) {
+    this.config = config;
+    this.numEventGenerators = numEventGenerators;
+    this.watermarkHoldbackSec = watermarkHoldbackSec;
+    this.isRateLimited = isRateLimited;
+  }
+
+  /** A reader to pull events from the generator. */
+  private class EventReader extends UnboundedReader<Event> {
+    /** Generator we are reading from. */
+    private final Generator generator;
+
+    /**
+     * Current watermark (ms since epoch). Initially set to beginning of time.
+     * Then updated to be the time of the next generated event.
+     * Then, once all events have been generated, set to the end of time.
+     */
+    private long watermark;
+
+    /**
+     * Current backlog (ms), as delay between timestamp of last returned event 
and the timestamp
+     * we should be up to according to wall-clock time. Used only for logging.
+     */
+    private long backlogDurationMs;
+
+    /**
+     * Current backlog, as estimated number of event bytes we are behind, or 
null if
+     * unknown. Reported to callers.
+     */
+    @Nullable
+    private Long backlogBytes;
+
+    /**
+     * Wallclock time (ms since epoch) we last reported the backlog, or -1 if 
never reported.
+     */
+    private long lastReportedBacklogWallclock;
+
+    /**
+     * Event time (ms since epoch) of pending event at last reported backlog, 
or -1 if never
+     * calculated.
+     */
+    private long timestampAtLastReportedBacklogMs;
+
+    /** Next event to make 'current' when wallclock time has advanced 
sufficiently. */
+    @Nullable
+    private TimestampedValue<Event> pendingEvent;
+
+    /** Wallclock time when {@link #pendingEvent} is due, or -1 if no pending 
event. */
+    private long pendingEventWallclockTime;
+
+    /** Current event to return from getCurrent. */
+    @Nullable
+    private TimestampedValue<Event> currentEvent;
+
+    /** Events which have been held back so as to force them to be late. */
+    private Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>();
+
+    public EventReader(Generator generator) {
+      this.generator = generator;
+      watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis();
+      lastReportedBacklogWallclock = -1;
+      pendingEventWallclockTime = -1;
+      timestampAtLastReportedBacklogMs = -1;
+    }
+
+    public EventReader(GeneratorConfig config) {
+      this(new Generator(config));
+    }
+
+    @Override
+    public boolean start() {
+      NexmarkUtils.error("starting unbounded generator %s", generator);
+      return advance();
+    }
+
+
+    @Override
+    public boolean advance() {
+      long now = System.currentTimeMillis();
+
+      while (pendingEvent == null) {
+        if (!generator.hasNext() && heldBackEvents.isEmpty()) {
+          // No more events, EVER.
+          if (isRateLimited) {
+            updateBacklog(System.currentTimeMillis(), 0);
+          }
+          if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+            watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+            NexmarkUtils.error("stopped unbounded generator %s", generator);
+          }
+          return false;
+        }
+
+        Generator.NextEvent next = heldBackEvents.peek();
+        if (next != null && next.wallclockTimestamp <= now) {
+          // Time to use the held-back event.
+          heldBackEvents.poll();
+          NexmarkUtils.error("replaying held-back event %dms behind watermark",
+                             watermark - next.eventTimestamp);
+        } else if (generator.hasNext()) {
+          next = generator.nextEvent();
+          if (isRateLimited && config.configuration.probDelayedEvent > 0.0
+              && config.configuration.occasionalDelaySec > 0
+              && ThreadLocalRandom.current().nextDouble() < 
config.configuration.probDelayedEvent) {
+            // We'll hold back this event and go around again.
+            long delayMs =
+                
ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 
1000)
+                + 1L;
+            NexmarkUtils.error("delaying event by %dms", delayMs);
+            heldBackEvents.add(next.withDelay(delayMs));
+            continue;
+          }
+        } else {
+          // Waiting for held-back event to fire.
+          if (isRateLimited) {
+            updateBacklog(now, 0);
+          }
+          return false;
+        }
+
+        pendingEventWallclockTime = next.wallclockTimestamp;
+        pendingEvent = TimestampedValue.of(next.event, new 
Instant(next.eventTimestamp));
+        long newWatermark =
+            next.watermark - 
Duration.standardSeconds(watermarkHoldbackSec).getMillis();
+        if (newWatermark > watermark) {
+          watermark = newWatermark;
+        }
+      }
+
+      if (isRateLimited) {
+        if (pendingEventWallclockTime > now) {
+          // We want this event to fire in the future. Try again later.
+          updateBacklog(now, 0);
+          return false;
+        }
+        updateBacklog(now, now - pendingEventWallclockTime);
+      }
+
+      // This event is ready to fire.
+      currentEvent = pendingEvent;
+      pendingEvent = null;
+      return true;
+    }
+
+    private void updateBacklog(long now, long newBacklogDurationMs) {
+      backlogDurationMs = newBacklogDurationMs;
+      long interEventDelayUs = generator.currentInterEventDelayUs();
+      if (interEventDelayUs != 0) {
+        long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 
1) / interEventDelayUs;
+        backlogBytes = 
generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents);
+      }
+      if (lastReportedBacklogWallclock < 0
+          || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) {
+        double timeDialation = Double.NaN;
+        if (pendingEvent != null
+            && lastReportedBacklogWallclock >= 0
+            && timestampAtLastReportedBacklogMs >= 0) {
+          long wallclockProgressionMs = now - lastReportedBacklogWallclock;
+          long eventTimeProgressionMs =
+              pendingEvent.getTimestamp().getMillis() - 
timestampAtLastReportedBacklogMs;
+          timeDialation = (double) eventTimeProgressionMs / (double) 
wallclockProgressionMs;
+        }
+        NexmarkUtils.error(
+            "unbounded generator backlog now %dms (%s bytes) at %dus 
interEventDelay "
+            + "with %f time dilation",
+            backlogDurationMs, backlogBytes, interEventDelayUs, timeDialation);
+        lastReportedBacklogWallclock = now;
+        if (pendingEvent != null) {
+          timestampAtLastReportedBacklogMs = 
pendingEvent.getTimestamp().getMillis();
+        }
+      }
+    }
+
+    @Override
+    public Event getCurrent() {
+      if (currentEvent == null) {
+        throw new NoSuchElementException();
+      }
+      return currentEvent.getValue();
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() {
+      if (currentEvent == null) {
+        throw new NoSuchElementException();
+      }
+      return currentEvent.getTimestamp();
+    }
+
+    @Override
+    public void close() {
+      // Nothing to close.
+    }
+
+    @Override
+    public UnboundedEventSource getCurrentSource() {
+      return UnboundedEventSource.this;
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return new Instant(watermark);
+    }
+
+    @Override
+    public Generator.Checkpoint getCheckpointMark() {
+      return generator.toCheckpoint();
+    }
+
+    @Override
+    public long getSplitBacklogBytes() {
+      return backlogBytes == null ? BACKLOG_UNKNOWN : backlogBytes;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("EventReader(%d, %d, %d)",
+          generator.getCurrentConfig().getStartEventId(), 
generator.getNextEventId(),
+          generator.getCurrentConfig().getStopEventId());
+    }
+  }
+
+  @Override
+  public Coder<Generator.Checkpoint> getCheckpointMarkCoder() {
+    return Generator.Checkpoint.CODER_INSTANCE;
+  }
+
+  @Override
+  public List<UnboundedEventSource> generateInitialSplits(
+      int desiredNumSplits, PipelineOptions options) {
+    NexmarkUtils.error(
+        "splitting unbounded source %s into %d sub-sources", config, 
numEventGenerators);
+    List<UnboundedEventSource> results = new ArrayList<>();
+    // Ignore desiredNumSplits and use numEventGenerators instead.
+    for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
+      results.add(new UnboundedEventSource(subConfig, 1, watermarkHoldbackSec, 
isRateLimited));
+    }
+    return results;
+  }
+
+  @Override
+  public EventReader createReader(
+      PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) {
+    if (checkpoint == null) {
+      NexmarkUtils.error("creating initial unbounded reader for %s", config);
+      return new EventReader(config);
+    } else {
+      NexmarkUtils.error("resuming unbounded reader from %s", checkpoint);
+      return new EventReader(checkpoint.toGenerator(config));
+    }
+  }
+
+  @Override
+  public void validate() {
+    // Nothing to validate.
+  }
+
+  @Override
+  public Coder<Event> getDefaultOutputCoder() {
+    return Event.CODER;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "UnboundedEventSource(%d, %d)", config.getStartEventId(), 
config.getStopEventId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
----------------------------------------------------------------------
diff --git 
a/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
 
b/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
new file mode 100644
index 0000000..16f901c
--- /dev/null
+++ 
b/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import com.google.common.base.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A transform to find the winning bid for each closed auction. In pseudo CQL 
syntax:
+ *
+ * <pre>
+ * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime)
+ * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
+ * WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
+ * GROUP BY A.id
+ * </pre>
+ *
+ * <p>We will also check that the winning bid is above the auction reserve. 
Note that
+ * we ignore the auction opening bid value since it has no impact on which bid 
eventually wins,
+ * if any.
+ *
+ * <p>Our implementation will use a custom windowing function in order to 
bring bids and
+ * auctions together without requiring global state.
+ */
+public class WinningBids extends PTransform<PCollection<Event>, 
PCollection<AuctionBid>> {
+  /** Windows for open auctions and bids. */
+  private static class AuctionOrBidWindow extends IntervalWindow implements 
Serializable {
+    /** Id of auction this window is for. */
+    public final long auction;
+
+    /**
+     * True if this window represents an actual auction, and thus has a 
start/end
+     * time matching that of the auction. False if this window represents a 
bid, and
+     * thus has an unbounded start/end time.
+     */
+    public final boolean isAuctionWindow;
+
+    /** For avro only. */
+    private AuctionOrBidWindow() {
+      super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE);
+      auction = 0;
+      isAuctionWindow = false;
+    }
+
+    private AuctionOrBidWindow(
+        Instant start, Instant end, long auctionId, boolean isAuctionWindow) {
+      super(start, end);
+      this.auction = auctionId;
+      this.isAuctionWindow = isAuctionWindow;
+    }
+
+    /** Return an auction window for {@code auction}. */
+    public static AuctionOrBidWindow forAuction(Instant timestamp, Auction 
auction) {
+      AuctionOrBidWindow result =
+          new AuctionOrBidWindow(timestamp, new Instant(auction.expires), 
auction.id, true);
+      return result;
+    }
+
+    /**
+     * Return a bid window for {@code bid}. It should later be merged into
+     * the corresponding auction window. However, it is possible this bid is 
for an already
+     * expired auction, or for an auction which the system has not yet seen. 
So we
+     * give the bid a bit of wiggle room in its interval.
+     */
+    public static AuctionOrBidWindow forBid(
+        long expectedAuctionDurationMs, Instant timestamp, Bid bid) {
+      // At this point we don't know which auctions are still valid, and the 
bid may
+      // be for an auction which won't start until some unknown time in the 
future
+      // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid).
+      // A real system would atomically reconcile bids and auctions by a 
separate mechanism.
+      // If we give bids an unbounded window it is possible a bid for an 
auction which
+      // has already expired would cause the system watermark to stall, since 
that window
+      // would never be retired.
+      // Instead, we will just give the bid a finite window which expires at
+      // the upper bound of auctions assuming the auction starts at the same 
time as the bid,
+      // and assuming the system is running at its lowest event rate (as per 
interEventDelayUs).
+      AuctionOrBidWindow result = new AuctionOrBidWindow(
+          timestamp, timestamp.plus(expectedAuctionDurationMs * 2), 
bid.auction, false);
+      return result;
+    }
+
+    /** Is this an auction window? */
+    public boolean isAuctionWindow() {
+      return isAuctionWindow;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; 
isAuctionWindow:%s}",
+          start(), end(), auction, isAuctionWindow);
+    }
+  }
+
+  /**
+   * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an 
auction id long.
+   */
+  private static class AuctionOrBidWindowCoder extends 
AtomicCoder<AuctionOrBidWindow> {
+    private static final AuctionOrBidWindowCoder INSTANCE = new 
AuctionOrBidWindowCoder();
+    private static final Coder<IntervalWindow> SUPER_CODER = 
IntervalWindow.getCoder();
+    private static final Coder<Long> ID_CODER = VarLongCoder.of();
+    private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+    @JsonCreator
+    public static AuctionOrBidWindowCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(AuctionOrBidWindow window, OutputStream outStream, 
Context context)
+        throws IOException, CoderException {
+      SUPER_CODER.encode(window, outStream, Context.NESTED);
+      ID_CODER.encode(window.auction, outStream, Context.NESTED);
+      INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, 
Context.NESTED);
+    }
+
+    @Override
+    public AuctionOrBidWindow decode(InputStream inStream, Context context)
+        throws IOException, CoderException {
+      IntervalWindow superWindow = SUPER_CODER.decode(inStream, 
Context.NESTED);
+      long auction = ID_CODER.decode(inStream, Context.NESTED);
+      boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) == 
0 ? false : true;
+      return new AuctionOrBidWindow(
+          superWindow.start(), superWindow.end(), auction, isAuctionWindow);
+    }
+  }
+
+  /** Assign events to auction windows and merges them intelligently. */
+  private static class AuctionOrBidWindowFn extends WindowFn<Event, 
AuctionOrBidWindow> {
+    /** Expected duration of auctions in ms. */
+    private final long expectedAuctionDurationMs;
+
+    public AuctionOrBidWindowFn(long expectedAuctionDurationMs) {
+      this.expectedAuctionDurationMs = expectedAuctionDurationMs;
+    }
+
+    @Override
+    public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) {
+      Event event = c.element();
+      if (event.newAuction != null) {
+        // Assign auctions to an auction window which expires at the auction's 
close.
+        return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), 
event.newAuction));
+      } else if (event.bid != null) {
+        // Assign bids to a temporary bid window which will later be merged 
into the appropriate
+        // auction window.
+        return Arrays.asList(
+            AuctionOrBidWindow.forBid(expectedAuctionDurationMs, 
c.timestamp(), event.bid));
+      } else {
+        // Don't assign people to any window. They will thus be dropped.
+        return Arrays.asList();
+      }
+    }
+
+    @Override
+    public void mergeWindows(MergeContext c) throws Exception {
+      // Split and index the auction and bid windows by auction id.
+      Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>();
+      Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new 
TreeMap<>();
+      for (AuctionOrBidWindow window : c.windows()) {
+        if (window.isAuctionWindow()) {
+          idToTrueAuctionWindow.put(window.auction, window);
+        } else {
+          List<AuctionOrBidWindow> bidWindows = 
idToBidAuctionWindows.get(window.auction);
+          if (bidWindows == null) {
+            bidWindows = new ArrayList<>();
+            idToBidAuctionWindows.put(window.auction, bidWindows);
+          }
+          bidWindows.add(window);
+        }
+      }
+
+      // Merge all 'bid' windows into their corresponding 'auction' window, 
provided the
+      // auction has not expired.
+      for (long auction : idToTrueAuctionWindow.keySet()) {
+        AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction);
+        List<AuctionOrBidWindow> bidWindows = 
idToBidAuctionWindows.get(auction);
+        if (bidWindows != null) {
+          List<AuctionOrBidWindow> toBeMerged = new ArrayList<>();
+          for (AuctionOrBidWindow bidWindow : bidWindows) {
+            if (bidWindow.start().isBefore(auctionWindow.end())) {
+              toBeMerged.add(bidWindow);
+            }
+            // else: This bid window will remain until its expire time, at 
which point it
+            // will expire without ever contributing to an output.
+          }
+          if (!toBeMerged.isEmpty()) {
+            toBeMerged.add(auctionWindow);
+            c.merge(toBeMerged, auctionWindow);
+          }
+        }
+      }
+    }
+
+    @Override
+    public boolean isCompatible(WindowFn<?, ?> other) {
+      return other instanceof AuctionOrBidWindowFn;
+    }
+
+    @Override
+    public Coder<AuctionOrBidWindow> windowCoder() {
+      return AuctionOrBidWindowCoder.of();
+    }
+
+    @Override
+    public AuctionOrBidWindow getSideInputWindow(BoundedWindow window) {
+      throw new UnsupportedOperationException("AuctionWindowFn not supported 
for side inputs");
+    }
+
+    /**
+     * Below we will GBK auctions and bids on their auction ids. Then we will 
reduce those
+     * per id to emit {@code (auction, winning bid)} pairs for auctions which 
have expired with at
+     * least one valid bid. We would like those output pairs to have a 
timestamp of the auction's
+     * expiry (since that's the earliest we know for sure we have the correct 
winner). We would
+     * also like to make that winning results are available to following 
stages at the auction's
+     * expiry.
+     *
+     * <p>
+     * Each result of the GBK will have a timestamp of the min of the result 
of this object's
+     * assignOutputTime over all records which end up in one of its iterables. 
Thus we get the
+     * desired behavior if we ignore each record's timestamp and always return 
the auction window's
+     * 'maxTimestamp', which will correspond to the auction's expiry.
+     *
+     * <p>
+     * In contrast, if this object's assignOutputTime were to return 
'inputTimestamp'
+     * (the usual implementation), then each GBK record will take as its 
timestamp the minimum of
+     * the timestamps of all bids and auctions within it, which will always be 
the auction's
+     * timestamp. An auction which expires well into the future would thus 
hold up the watermark
+     * of the GBK results until that auction expired. That in turn would hold 
up all winning pairs.
+     */
+    @Override
+    public Instant getOutputTime(
+        Instant inputTimestamp, AuctionOrBidWindow window) {
+      return window.maxTimestamp();
+    }
+  }
+
+  private final AuctionOrBidWindowFn auctionOrBidWindowFn;
+
+  public WinningBids(String name, NexmarkConfiguration configuration) {
+    super(name);
+    // What's the expected auction time (when the system is running at the 
lowest event rate).
+    long[] interEventDelayUs = configuration.rateShape.interEventDelayUs(
+        configuration.firstEventRate, configuration.nextEventRate,
+        configuration.rateUnit, configuration.numEventGenerators);
+    long longestDelayUs = 0;
+    for (int i = 0; i < interEventDelayUs.length; i++) {
+      longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]);
+    }
+    // Adjust for proportion of auction events amongst all events.
+    longestDelayUs =
+        (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR)
+        / GeneratorConfig.AUCTION_PROPORTION;
+    // Adjust for number of in-flight auctions.
+    longestDelayUs = longestDelayUs * configuration.numInFlightAuctions;
+    long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000;
+    NexmarkUtils.console("Expected auction duration is %d ms", 
expectedAuctionDurationMs);
+    auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs);
+  }
+
+  @Override
+  public PCollection<AuctionBid> apply(PCollection<Event> events) {
+    // Window auctions and bids into custom auction windows. New people events 
will be discarded.
+    // This will allow us to bring bids and auctions together irrespective of 
how long
+    // each auction is open for.
+    events = events.apply(Window.named("Window").into(auctionOrBidWindowFn));
+
+    // Key auctions by their id.
+    PCollection<KV<Long, Auction>> auctionsById =
+        
events.apply(NexmarkQuery.JUST_NEW_AUCTIONS).apply(NexmarkQuery.AUCTION_BY_ID);
+
+    // Key bids by their auction id.
+    PCollection<KV<Long, Bid>> bidsByAuctionId =
+        
events.apply(NexmarkQuery.JUST_BIDS).apply(NexmarkQuery.BID_BY_AUCTION);
+
+    // Find the highest price valid bid for each closed auction.
+    return
+        // Join auctions and bids.
+        KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
+            .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
+            .apply(CoGroupByKey.<Long>create())
+
+            // Filter and select.
+            .apply(
+                ParDo.named(name + ".Join")
+                    .of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
+                      final Aggregator<Long, Long> noAuctionCounter =
+                          createAggregator("noAuction", new SumLongFn());
+                      final Aggregator<Long, Long> underReserveCounter =
+                          createAggregator("underReserve", new SumLongFn());
+                      final Aggregator<Long, Long> noValidBidsCounter =
+                          createAggregator("noValidBids", new SumLongFn());
+
+
+                      @Override
+                      public void processElement(ProcessContext c) {
+                        Auction auction =
+                            
c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
+                        if (auction == null) {
+                          // We have bids without a matching auction. Give up.
+                          noAuctionCounter.addValue(1L);
+                          return;
+                        }
+                        // Find the current winning bid for auction.
+                        // The earliest bid with the maximum price above the 
reserve wins.
+                        Bid bestBid = null;
+                        for (Bid bid : 
c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
+                          // Bids too late for their auction will have been
+                          // filtered out by the window merge function.
+                          Preconditions.checkState(bid.dateTime < 
auction.expires);
+                          if (bid.price < auction.reserve) {
+                            // Bid price is below auction reserve.
+                            underReserveCounter.addValue(1L);
+                            continue;
+                          }
+
+                          if (bestBid == null
+                              || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, 
bestBid) > 0) {
+                            bestBid = bid;
+                          }
+                        }
+                        if (bestBid == null) {
+                          // We don't have any valid bids for auction.
+                          noValidBidsCounter.addValue(1L);
+                          return;
+                        }
+                        c.output(new AuctionBid(auction, bestBid));
+                      }
+                    }));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git 
a/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
 
b/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
new file mode 100644
index 0000000..b61aed1
--- /dev/null
+++ 
b/integration/java/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import javax.annotation.Nullable;
+
+/**
+ * A simulator of the {@code WinningBids} query.
+ */
+public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> 
{
+  /** Auctions currently still open, indexed by auction id. */
+  private final Map<Long, Auction> openAuctions;
+
+  /** The ids of auctions known to be closed. */
+  private final Set<Long> closedAuctions;
+
+  /** Current best valid bids for open auctions, indexed by auction id. */
+  private final Map<Long, Bid> bestBids;
+
+  /** Bids for auctions we havn't seen yet. */
+  private final List<Bid> bidsWithoutAuctions;
+
+  /**
+   * Timestamp of last new auction or bid event (ms since epoch).
+   */
+  private long lastTimestamp;
+
+  public WinningBidsSimulator(NexmarkConfiguration configuration) {
+    super(NexmarkUtils.standardEventIterator(configuration));
+    openAuctions = new TreeMap<>();
+    closedAuctions = new TreeSet<>();
+    bestBids = new TreeMap<>();
+    bidsWithoutAuctions = new ArrayList<>();
+    lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+  }
+
+  /**
+   * Try to account for {@code bid} in state. Return true if bid has now been
+   * accounted for by {@code bestBids}.
+   */
+  private boolean captureBestBid(Bid bid, boolean shouldLog) {
+    if (closedAuctions.contains(bid.auction)) {
+      // Ignore bids for known, closed auctions.
+      if (shouldLog) {
+        NexmarkUtils.info("closed auction: %s", bid);
+      }
+      return true;
+    }
+    Auction auction = openAuctions.get(bid.auction);
+    if (auction == null) {
+      // We don't have an auction for this bid yet, so can't determine if it is
+      // winning or not.
+      if (shouldLog) {
+        NexmarkUtils.info("pending auction: %s", bid);
+      }
+      return false;
+    }
+    if (bid.price < auction.reserve) {
+      // Bid price is too low.
+      if (shouldLog) {
+        NexmarkUtils.info("below reserve: %s", bid);
+      }
+      return true;
+    }
+    Bid existingBid = bestBids.get(bid.auction);
+    if (existingBid == null || 
Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) {
+      // We've found a (new) best bid for a known auction.
+      bestBids.put(bid.auction, bid);
+      if (shouldLog) {
+        NexmarkUtils.info("new winning bid: %s", bid);
+      }
+    } else {
+      if (shouldLog) {
+        NexmarkUtils.info("ignoring low bid: %s", bid);
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Try to match bids without auctions to auctions.
+   */
+  private void flushBidsWithoutAuctions() {
+    Iterator<Bid> itr = bidsWithoutAuctions.iterator();
+    while (itr.hasNext()) {
+      Bid bid = itr.next();
+      if (captureBestBid(bid, false)) {
+        NexmarkUtils.info("bid now accounted for: %s", bid);
+        itr.remove();
+      }
+    }
+  }
+
+  /**
+   * Return the next winning bid for an expired auction relative to {@code 
timestamp}.
+   * Return null if no more winning bids, in which case all expired auctions 
will
+   * have been removed from our state. Retire auctions in order of expire time.
+   */
+  @Nullable
+  private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) {
+    Map<Long, List<Long>> toBeRetired = new TreeMap<>();
+    for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) {
+      if (entry.getValue().expires <= timestamp) {
+        List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires);
+        if (idsAtTime == null) {
+          idsAtTime = new ArrayList<>();
+          toBeRetired.put(entry.getValue().expires, idsAtTime);
+        }
+        idsAtTime.add(entry.getKey());
+      }
+    }
+    for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) {
+      for (long id : entry.getValue()) {
+        Auction auction = openAuctions.get(id);
+        NexmarkUtils.info("retiring auction: %s", auction);
+        openAuctions.remove(id);
+        Bid bestBid = bestBids.get(id);
+        if (bestBid != null) {
+          TimestampedValue<AuctionBid> result =
+              TimestampedValue.of(new AuctionBid(auction, bestBid), new 
Instant(auction.expires));
+          NexmarkUtils.info("winning: %s", result);
+          return result;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  protected void run() {
+    if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
+      // We may have finally seen the auction a bid was intended for.
+      flushBidsWithoutAuctions();
+      TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp);
+      if (result != null) {
+        addResult(result);
+        return;
+      }
+    }
+
+    TimestampedValue<Event> timestampedEvent = nextInput();
+    if (timestampedEvent == null) {
+      // No more events. Flush any still open auctions.
+      TimestampedValue<AuctionBid> result =
+          nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+      if (result == null) {
+        // We are done.
+        allDone();
+        return;
+      }
+      addResult(result);
+      return;
+    }
+
+    Event event = timestampedEvent.getValue();
+    if (event.newPerson != null) {
+      // Ignore new person events.
+      return;
+    }
+
+    lastTimestamp = timestampedEvent.getTimestamp().getMillis();
+    if (event.newAuction != null) {
+      // Add this new open auction to our state.
+      openAuctions.put(event.newAuction.id, event.newAuction);
+    } else {
+      if (!captureBestBid(event.bid, true)) {
+        // We don't know what to do with this bid yet.
+        NexmarkUtils.info("bid not yet accounted for: %s", event.bid);
+        bidsWithoutAuctions.add(event.bid);
+      }
+    }
+    // Keep looking for winning bids.
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/test/java/org/apache/beam/integration/nexmark/BoundedEventSourceTest.java
----------------------------------------------------------------------
diff --git 
a/integration/java/src/test/java/org/apache/beam/integration/nexmark/BoundedEventSourceTest.java
 
b/integration/java/src/test/java/org/apache/beam/integration/nexmark/BoundedEventSourceTest.java
new file mode 100644
index 0000000..f017267
--- /dev/null
+++ 
b/integration/java/src/test/java/org/apache/beam/integration/nexmark/BoundedEventSourceTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test {@link BoundedEventSource}.
+ */
+@RunWith(JUnit4.class)
+public class BoundedEventSourceTest {
+  private GeneratorConfig makeConfig(long n) {
+    return new GeneratorConfig(
+        NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0);
+  }
+
+  @Test
+  public void sourceAndReadersWork() throws Exception {
+    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    long n = 200L;
+    BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
+
+    SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource(
+        source.createReader(options), options);
+  }
+
+  @Test
+  public void splitAtFractionRespectsContract() throws Exception {
+    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    long n = 20L;
+    BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
+
+    // Can't split if already consumed.
+    SourceTestUtils.assertSplitAtFractionFails(source, 10, 0.3, options);
+
+    SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(source, 5, 0.3, 
options);
+
+    SourceTestUtils.assertSplitAtFractionExhaustive(source, options);
+  }
+
+  @Test
+  public void splitIntoBundlesRespectsContract() throws Exception {
+    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    long n = 200L;
+    BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
+    SourceTestUtils.assertSourcesEqualReferenceSource(
+        source, source.splitIntoBundles(10, options), options);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/test/java/org/apache/beam/integration/nexmark/GeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/integration/java/src/test/java/org/apache/beam/integration/nexmark/GeneratorTest.java
 
b/integration/java/src/test/java/org/apache/beam/integration/nexmark/GeneratorTest.java
new file mode 100644
index 0000000..bbaee26
--- /dev/null
+++ 
b/integration/java/src/test/java/org/apache/beam/integration/nexmark/GeneratorTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Test {@link Generator}.
+ */
+@RunWith(JUnit4.class)
+public class GeneratorTest {
+  private GeneratorConfig makeConfig(long n) {
+    return new GeneratorConfig(
+        NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0);
+  }
+
+  private <T> long consume(long n, Iterator<T> itr) {
+    for (long i = 0; i < n; i++) {
+      assertTrue(itr.hasNext());
+      itr.next();
+    }
+    return n;
+  }
+
+  private <T> long consume(Iterator<T> itr) {
+    long n = 0;
+    while (itr.hasNext()) {
+      itr.next();
+      n++;
+    }
+    return n;
+  }
+
+  @Test
+  public void splitAtFractionPreservesOverallEventCount() {
+    long n = 55729L;
+    GeneratorConfig initialConfig = makeConfig(n);
+    long expected = initialConfig.getStopEventId() - 
initialConfig.getStartEventId();
+
+    long actual = 0;
+
+    Generator initialGenerator = new Generator(initialConfig);
+
+    // Consume some events.
+    actual += consume(5000, initialGenerator);
+
+
+    // Split once.
+    GeneratorConfig remainConfig1 = initialGenerator.splitAtEventId(9000L);
+    Generator remainGenerator1 = new Generator(remainConfig1);
+
+    // Consume some more events.
+    actual += consume(2000, initialGenerator);
+    actual += consume(3000, remainGenerator1);
+
+    // Split again.
+    GeneratorConfig remainConfig2 = remainGenerator1.splitAtEventId(30000L);
+    Generator remainGenerator2 = new Generator(remainConfig2);
+
+    // Run to completion.
+    actual += consume(initialGenerator);
+    actual += consume(remainGenerator1);
+    actual += consume(remainGenerator2);
+
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void splitPreservesOverallEventCount() {
+    long n = 51237L;
+    GeneratorConfig initialConfig = makeConfig(n);
+    long expected = initialConfig.getStopEventId() - 
initialConfig.getStartEventId();
+
+    List<Generator> generators = new ArrayList<>();
+    for (GeneratorConfig subConfig : initialConfig.split(20)) {
+      generators.add(new Generator(subConfig));
+    }
+
+    long actual = 0;
+    for (Generator generator : generators) {
+      actual += consume(generator);
+    }
+
+    assertEquals(expected, actual);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java
----------------------------------------------------------------------
diff --git 
a/integration/java/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java
 
b/integration/java/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java
new file mode 100644
index 0000000..860fa78
--- /dev/null
+++ 
b/integration/java/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test the various NEXMark queries yield results coherent with their models.
+ */
+@RunWith(JUnit4.class)
+public class QueryTest {
+  private static final NexmarkConfiguration CONFIG = 
NexmarkConfiguration.DEFAULT.clone();
+
+  static {
+    CONFIG.numEvents = 2000;
+  }
+
+  /** Test {@code query} matches {@code model}. */
+  private static void queryMatchesModel(String name, NexmarkQuery query, 
NexmarkQueryModel model) {
+    Pipeline p = TestPipeline.create();
+    NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
+    PCollection<TimestampedValue<KnownSize>> results =
+        p.apply(NexmarkUtils.batchEventsSource(name, CONFIG)).apply(query);
+    results.setIsBoundedInternal(IsBounded.BOUNDED);
+    PAssert.that(results).satisfies(model.assertionFor());
+    p.run();
+  }
+
+  @Test
+  public void query0MatchesModel() {
+    queryMatchesModel("Query0Test", new Query0(CONFIG), new 
Query0Model(CONFIG));
+  }
+
+  @Test
+  public void query1MatchesModel() {
+    queryMatchesModel("Query1Test", new Query1(CONFIG), new 
Query1Model(CONFIG));
+  }
+
+  @Test
+  public void query2MatchesModel() {
+    queryMatchesModel("Query2Test", new Query2(CONFIG), new 
Query2Model(CONFIG));
+  }
+
+  @Test
+  public void query3MatchesModel() {
+    queryMatchesModel("Query3Test", new Query3(CONFIG), new 
Query3Model(CONFIG));
+  }
+
+  @Test
+  public void query4MatchesModel() {
+    queryMatchesModel("Query4Test", new Query4(CONFIG), new 
Query4Model(CONFIG));
+  }
+
+  @Test
+  public void query5MatchesModel() {
+    queryMatchesModel("Query5Test", new Query5(CONFIG), new 
Query5Model(CONFIG));
+  }
+
+  @Test
+  public void query6MatchesModel() {
+    queryMatchesModel("Query6Test", new Query6(CONFIG), new 
Query6Model(CONFIG));
+  }
+
+  @Test
+  public void query7MatchesModel() {
+    queryMatchesModel("Query7Test", new Query7(CONFIG), new 
Query7Model(CONFIG));
+  }
+
+  @Test
+  public void query8MatchesModel() {
+    queryMatchesModel("Query8Test", new Query8(CONFIG), new 
Query8Model(CONFIG));
+  }
+
+  @Test
+  public void query9MatchesModel() {
+    queryMatchesModel("Query9Test", new Query9(CONFIG), new 
Query9Model(CONFIG));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git 
a/integration/java/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
 
b/integration/java/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
new file mode 100644
index 0000000..5d72f77
--- /dev/null
+++ 
b/integration/java/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Test UnboundedEventSource.
+ */
+@RunWith(JUnit4.class)
+public class UnboundedEventSourceTest {
+  private GeneratorConfig makeConfig(long n) {
+    return new GeneratorConfig(
+        NexmarkConfiguration.DEFAULT, System.currentTimeMillis(), 0, n, 0);
+  }
+
+  /**
+   * Helper for tracking which ids we've seen (so we can detect dups) and
+   * confirming reading events match the model events.
+   */
+  private static class EventIdChecker {
+    private Set<Long> seenPersonIds = new HashSet<>();
+    private Set<Long> seenAuctionIds = new HashSet<>();
+
+    public void add(Event event) {
+      if (event.newAuction != null) {
+        assertTrue(seenAuctionIds.add(event.newAuction.id));
+      } else if (event.newPerson != null) {
+        assertTrue(seenPersonIds.add(event.newPerson.id));
+      }
+    }
+
+    public void add(int n, UnboundedReader<Event> reader, Generator 
modelGenerator)
+        throws IOException {
+      for (int i = 0; i < n; i++) {
+        assertTrue(modelGenerator.hasNext());
+        Event modelEvent = modelGenerator.next().getValue();
+        assertTrue(reader.advance());
+        Event actualEvent = reader.getCurrent();
+        assertEquals(modelEvent.toString(), actualEvent.toString());
+        add(actualEvent);
+      }
+    }
+  }
+
+  /**
+   * Check aggressively checkpointing and resuming a reader gives us exactly 
the
+   * same event stream as reading directly.
+   */
+  @Test
+  public void resumeFromCheckpoint() throws IOException {
+    Random random = new Random(297);
+    int n = 47293;
+    GeneratorConfig config = makeConfig(n);
+    Generator modelGenerator = new Generator(config);
+
+    EventIdChecker checker = new EventIdChecker();
+    Pipeline p = TestPipeline.create();
+    PipelineOptions options = p.getOptions();
+    UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, 
false);
+    UnboundedReader<Event> reader = source.createReader(options, null);
+
+    while (n > 0) {
+      int m = Math.min(459 + random.nextInt(455), n);
+      System.out.printf("reading %d...\n", m);
+      checker.add(m, reader, modelGenerator);
+      n -= m;
+      System.out.printf("splitting with %d remaining...\n", n);
+      CheckpointMark checkpointMark = reader.getCheckpointMark();
+      assertTrue(checkpointMark instanceof Generator.Checkpoint);
+      reader = source.createReader(options, (Generator.Checkpoint) 
checkpointMark);
+    }
+
+    assertFalse(reader.advance());
+  }
+}

Reply via email to