http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java new file mode 100644 index 0000000..3c1cf3b --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import org.apache.beam.sdk.nexmark.Monitor; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.AuctionBid; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.CategoryPrice; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Mean; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * Query 4, 'Average Price for a Category'. Select the average of the wining bid prices for all + * closed auctions in each category. In CQL syntax: + * + * <pre>{@code + * SELECT Istream(AVG(Q.final)) + * FROM Category C, (SELECT Rstream(MAX(B.price) AS final, A.category) + * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] + * WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME + * GROUP BY A.id, A.category) Q + * WHERE Q.category = C.id + * GROUP BY C.id; + * }</pre> + * + * <p>For extra spiciness our implementation differs slightly from the above: + * <ul> + * <li>We select both the average winning price and the category. + * <li>We don't bother joining with a static category table, since it's contents are never used. + * <li>We only consider bids which are above the auction's reserve price. + * <li>We accept the highest-price, earliest valid bid as the winner. + * <li>We calculate the averages oven a sliding window of size {@code windowSizeSec} and + * period {@code windowPeriodSec}. + * </ul> + */ +public class Query4 extends NexmarkQuery { + private final Monitor<AuctionBid> winningBidsMonitor; + + public Query4(NexmarkConfiguration configuration) { + super(configuration, "Query4"); + winningBidsMonitor = new Monitor<>(name + ".WinningBids", "winning"); + } + + private PCollection<CategoryPrice> applyTyped(PCollection<Event> events) { + PCollection<AuctionBid> winningBids = + events + // Find the winning bid for each closed auction. + .apply(new WinningBids(name + ".WinningBids", configuration)); + + // Monitor winning bids + winningBids = winningBids.apply(name + ".WinningBidsMonitor", + winningBidsMonitor.getTransform()); + + return winningBids + // Key the winning bid price by the auction category. + .apply(name + ".Rekey", + ParDo.of(new DoFn<AuctionBid, KV<Long, Long>>() { + @ProcessElement + public void processElement(ProcessContext c) { + Auction auction = c.element().auction; + Bid bid = c.element().bid; + c.output(KV.of(auction.category, bid.price)); + } + })) + + // Re-window so we can calculate a sliding average + .apply(Window.<KV<Long, Long>>into( + SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec)) + .every(Duration.standardSeconds(configuration.windowPeriodSec)))) + + // Find the average of the winning bids for each category. + // Make sure we share the work for each category between workers. + .apply(Mean.<Long, Long>perKey().withHotKeyFanout(configuration.fanout)) + + // For testing against Query4Model, capture which results are 'final'. + .apply(name + ".Project", + ParDo.of(new DoFn<KV<Long, Double>, CategoryPrice>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(new CategoryPrice(c.element().getKey(), + Math.round(c.element().getValue()), c.pane().isLast())); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java new file mode 100644 index 0000000..84274a8 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.AuctionBid; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.CategoryPrice; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Assert; + +/** + * A direct implementation of {@link Query4}. + */ +public class Query4Model extends NexmarkQueryModel implements Serializable { + /** + * Simulator for query 4. + */ + private class Simulator extends AbstractSimulator<AuctionBid, CategoryPrice> { + /** The prices and categories for all winning bids in the last window size. */ + private final List<TimestampedValue<CategoryPrice>> winningPricesByCategory; + + /** Timestamp of last result (ms since epoch). */ + private Instant lastTimestamp; + + /** When oldest active window starts. */ + private Instant windowStart; + + /** The last seen result for each category. */ + private final Map<Long, TimestampedValue<CategoryPrice>> lastSeenResults; + + public Simulator(NexmarkConfiguration configuration) { + super(new WinningBidsSimulator(configuration).results()); + winningPricesByCategory = new ArrayList<>(); + lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + windowStart = NexmarkUtils.BEGINNING_OF_TIME; + lastSeenResults = new TreeMap<>(); + } + + /** + * Calculate the average bid price for each category for all winning bids + * which are strictly before {@code end}. + */ + private void averages(Instant end) { + Map<Long, Long> counts = new TreeMap<>(); + Map<Long, Long> totals = new TreeMap<>(); + for (TimestampedValue<CategoryPrice> value : winningPricesByCategory) { + if (!value.getTimestamp().isBefore(end)) { + continue; + } + long category = value.getValue().category; + long price = value.getValue().price; + Long count = counts.get(category); + if (count == null) { + count = 1L; + } else { + count += 1; + } + counts.put(category, count); + Long total = totals.get(category); + if (total == null) { + total = price; + } else { + total += price; + } + totals.put(category, total); + } + for (Map.Entry<Long, Long> entry : counts.entrySet()) { + long category = entry.getKey(); + long count = entry.getValue(); + long total = totals.get(category); + TimestampedValue<CategoryPrice> result = TimestampedValue.of( + new CategoryPrice(category, Math.round((double) total / count), true), lastTimestamp); + addIntermediateResult(result); + lastSeenResults.put(category, result); + } + } + + /** + * Calculate averages for any windows which can now be retired. Also prune entries + * which can no longer contribute to any future window. + */ + private void prune(Instant newWindowStart) { + while (!newWindowStart.equals(windowStart)) { + averages(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec))); + windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec)); + Iterator<TimestampedValue<CategoryPrice>> itr = winningPricesByCategory.iterator(); + while (itr.hasNext()) { + if (itr.next().getTimestamp().isBefore(windowStart)) { + itr.remove(); + } + } + if (winningPricesByCategory.isEmpty()) { + windowStart = newWindowStart; + } + } + } + + /** + * Capture the winning bid. + */ + private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) { + winningPricesByCategory.add( + TimestampedValue.of(new CategoryPrice(auction.category, bid.price, false), timestamp)); + } + + @Override + protected void run() { + TimestampedValue<AuctionBid> timestampedWinningBid = nextInput(); + if (timestampedWinningBid == null) { + prune(NexmarkUtils.END_OF_TIME); + for (TimestampedValue<CategoryPrice> result : lastSeenResults.values()) { + addResult(result); + } + allDone(); + return; + } + lastTimestamp = timestampedWinningBid.getTimestamp(); + Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec), + Duration.standardSeconds(configuration.windowPeriodSec), lastTimestamp); + prune(newWindowStart); + captureWinningBid(timestampedWinningBid.getValue().auction, + timestampedWinningBid.getValue().bid, lastTimestamp); + } + } + + public Query4Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected Iterable<TimestampedValue<KnownSize>> relevantResults( + Iterable<TimestampedValue<KnownSize>> results) { + // Find the last (in processing time) reported average price for each category. + Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>(); + for (TimestampedValue<KnownSize> obj : results) { + Assert.assertTrue("have CategoryPrice", obj.getValue() instanceof CategoryPrice); + CategoryPrice categoryPrice = (CategoryPrice) obj.getValue(); + if (categoryPrice.isLast) { + finalAverages.put( + categoryPrice.category, + TimestampedValue.of((KnownSize) categoryPrice, obj.getTimestamp())); + } + } + + return finalAverages.values(); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValue(itr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java new file mode 100644 index 0000000..d027cb3 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.AuctionCount; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * Query 5, 'Hot Items'. Which auctions have seen the most bids in the last hour (updated every + * minute). In CQL syntax: + * + * <pre>{@code + * SELECT Rstream(auction) + * FROM (SELECT B1.auction, count(*) AS num + * FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1 + * GROUP BY B1.auction) + * WHERE num >= ALL (SELECT count(*) + * FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2 + * GROUP BY B2.auction); + * }</pre> + * + * <p>To make things a bit more dynamic and easier to test we use much shorter windows, and + * we'll also preserve the bid counts. + */ +public class Query5 extends NexmarkQuery { + public Query5(NexmarkConfiguration configuration) { + super(configuration, "Query5"); + } + + private PCollection<AuctionCount> applyTyped(PCollection<Event> events) { + return events + // Only want the bid events. + .apply(JUST_BIDS) + // Window the bids into sliding windows. + .apply( + Window.<Bid>into( + SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec)) + .every(Duration.standardSeconds(configuration.windowPeriodSec)))) + // Project just the auction id. + .apply("BidToAuction", BID_TO_AUCTION) + + // Count the number of bids per auction id. + .apply(Count.<Long>perElement()) + + // We'll want to keep all auctions with the maximal number of bids. + // Start by lifting each into a singleton list. + // need to do so because bellow combine returns a list of auctions in the key in case of + // equal number of bids. Combine needs to have same input type and return type. + .apply( + name + ".ToSingletons", + ParDo.of( + new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output( + KV.of( + Collections.singletonList(c.element().getKey()), + c.element().getValue())); + } + })) + + // Keep only the auction ids with the most bids. + .apply( + Combine.globally( + new Combine.BinaryCombineFn<KV<List<Long>, Long>>() { + @Override + public KV<List<Long>, Long> apply( + KV<List<Long>, Long> left, KV<List<Long>, Long> right) { + List<Long> leftBestAuctions = left.getKey(); + long leftCount = left.getValue(); + List<Long> rightBestAuctions = right.getKey(); + long rightCount = right.getValue(); + if (leftCount > rightCount) { + return left; + } else if (leftCount < rightCount) { + return right; + } else { + List<Long> newBestAuctions = new ArrayList<>(); + newBestAuctions.addAll(leftBestAuctions); + newBestAuctions.addAll(rightBestAuctions); + return KV.of(newBestAuctions, leftCount); + } + } + }) + .withoutDefaults() + .withFanout(configuration.fanout)) + + // Project into result. + .apply( + name + ".Select", + ParDo.of( + new DoFn<KV<List<Long>, Long>, AuctionCount>() { + @ProcessElement + public void processElement(ProcessContext c) { + long count = c.element().getValue(); + for (long auction : c.element().getKey()) { + c.output(new AuctionCount(auction, count)); + } + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java new file mode 100644 index 0000000..7ed0709 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.AuctionCount; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A direct implementation of {@link Query5}. + */ +public class Query5Model extends NexmarkQueryModel implements Serializable { + /** + * Simulator for query 5. + */ + private class Simulator extends AbstractSimulator<Event, AuctionCount> { + /** Time of bids still contributing to open windows, indexed by their auction id. */ + private final Map<Long, List<Instant>> bids; + + /** When oldest active window starts. */ + private Instant windowStart; + + public Simulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + bids = new TreeMap<>(); + windowStart = NexmarkUtils.BEGINNING_OF_TIME; + } + + /** + * Count bids per auction id for bids strictly before {@code end}. Add the auction ids with + * the maximum number of bids to results. + */ + private void countBids(Instant end) { + Map<Long, Long> counts = new TreeMap<>(); + long maxCount = 0L; + for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) { + long count = 0L; + long auction = entry.getKey(); + for (Instant bid : entry.getValue()) { + if (bid.isBefore(end)) { + count++; + } + } + if (count > 0) { + counts.put(auction, count); + maxCount = Math.max(maxCount, count); + } + } + for (Map.Entry<Long, Long> entry : counts.entrySet()) { + long auction = entry.getKey(); + long count = entry.getValue(); + if (count == maxCount) { + AuctionCount result = new AuctionCount(auction, count); + addResult(TimestampedValue.of(result, end)); + } + } + } + + /** + * Retire bids which are strictly before {@code cutoff}. Return true if there are any bids + * remaining. + */ + private boolean retireBids(Instant cutoff) { + boolean anyRemain = false; + for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) { + long auction = entry.getKey(); + Iterator<Instant> itr = entry.getValue().iterator(); + while (itr.hasNext()) { + Instant bid = itr.next(); + if (bid.isBefore(cutoff)) { + NexmarkUtils.info("retire: %s for %s", bid, auction); + itr.remove(); + } else { + anyRemain = true; + } + } + } + return anyRemain; + } + + /** + * Retire active windows until we've reached {@code newWindowStart}. + */ + private void retireWindows(Instant newWindowStart) { + while (!newWindowStart.equals(windowStart)) { + NexmarkUtils.info("retiring window %s, aiming for %s", windowStart, newWindowStart); + // Count bids in the window (windowStart, windowStart + size]. + countBids(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec))); + // Advance the window. + windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec)); + // Retire bids which will never contribute to a future window. + if (!retireBids(windowStart)) { + // Can fast forward to latest window since no more outstanding bids. + windowStart = newWindowStart; + } + } + } + + /** + * Add bid to state. + */ + private void captureBid(Bid bid, Instant timestamp) { + List<Instant> existing = bids.get(bid.auction); + if (existing == null) { + existing = new ArrayList<>(); + bids.put(bid.auction, existing); + } + existing.add(timestamp); + } + + @Override + public void run() { + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + // Drain the remaining windows. + retireWindows(NexmarkUtils.END_OF_TIME); + allDone(); + return; + } + + Event event = timestampedEvent.getValue(); + if (event.bid == null) { + // Ignore non-bid events. + return; + } + Instant timestamp = timestampedEvent.getTimestamp(); + Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec), + Duration.standardSeconds(configuration.windowPeriodSec), timestamp); + // Capture results from any windows we can now retire. + retireWindows(newWindowStart); + // Capture current bid. + captureBid(event.bid, timestamp); + } + } + + public Query5Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValue(itr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java new file mode 100644 index 0000000..bc6b12c --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.AuctionBid; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.nexmark.model.SellerPrice; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * Query 6, 'Average Selling Price by Seller'. Select the average selling price over the + * last 10 closed auctions by the same seller. In CQL syntax: + * + * <pre>{@code + * SELECT Istream(AVG(Q.final), Q.seller) + * FROM (SELECT Rstream(MAX(B.price) AS final, A.seller) + * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] + * WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME + * GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q + * GROUP BY Q.seller; + * }</pre> + * + * <p>We are a little more exact with selecting winning bids: see {@link WinningBids}. + */ +public class Query6 extends NexmarkQuery { + /** + * Combiner to keep track of up to {@code maxNumBids} of the most recent wining bids and calculate + * their average selling price. + */ + private static class MovingMeanSellingPrice extends Combine.CombineFn<Bid, List<Bid>, Long> { + private final int maxNumBids; + + public MovingMeanSellingPrice(int maxNumBids) { + this.maxNumBids = maxNumBids; + } + + @Override + public List<Bid> createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List<Bid> addInput(List<Bid> accumulator, Bid input) { + accumulator.add(input); + Collections.sort(accumulator, Bid.ASCENDING_TIME_THEN_PRICE); + if (accumulator.size() > maxNumBids) { + accumulator.remove(0); + } + return accumulator; + } + + @Override + public List<Bid> mergeAccumulators(Iterable<List<Bid>> accumulators) { + List<Bid> result = new ArrayList<>(); + for (List<Bid> accumulator : accumulators) { + result.addAll(accumulator); + } + Collections.sort(result, Bid.ASCENDING_TIME_THEN_PRICE); + if (result.size() > maxNumBids) { + result = Lists.newArrayList(result.listIterator(result.size() - maxNumBids)); + } + return result; + } + + @Override + public Long extractOutput(List<Bid> accumulator) { + if (accumulator.isEmpty()) { + return 0L; + } + long sumOfPrice = 0; + for (Bid bid : accumulator) { + sumOfPrice += bid.price; + } + return Math.round((double) sumOfPrice / accumulator.size()); + } + } + + public Query6(NexmarkConfiguration configuration) { + super(configuration, "Query6"); + } + + private PCollection<SellerPrice> applyTyped(PCollection<Event> events) { + return events + // Find the winning bid for each closed auction. + .apply(new WinningBids(name + ".WinningBids", configuration)) + + // Key the winning bid by the seller id. + .apply(name + ".Rekey", + ParDo.of(new DoFn<AuctionBid, KV<Long, Bid>>() { + @ProcessElement + public void processElement(ProcessContext c) { + Auction auction = c.element().auction; + Bid bid = c.element().bid; + c.output(KV.of(auction.seller, bid)); + } + })) + + // Re-window to update on every wining bid. + .apply( + Window.<KV<Long, Bid>>into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.ZERO)) + + // Find the average of last 10 winning bids for each seller. + .apply(Combine.<Long, Bid, Long>perKey(new MovingMeanSellingPrice(10))) + + // Project into our datatype. + .apply(name + ".Select", + ParDo.of(new DoFn<KV<Long, Long>, SellerPrice>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(new SellerPrice(c.element().getKey(), c.element().getValue())); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java new file mode 100644 index 0000000..b5152d8 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.AuctionBid; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.nexmark.model.SellerPrice; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; +import org.junit.Assert; + +/** + * A direct implementation of {@link Query6}. + */ +public class Query6Model extends NexmarkQueryModel implements Serializable { + /** + * Simulator for query 6. + */ + private static class Simulator extends AbstractSimulator<AuctionBid, SellerPrice> { + /** The cumulative count of winning bids, indexed by seller id. */ + private final Map<Long, Long> numWinningBidsPerSeller; + + /** The cumulative total of winning bid prices, indexed by seller id. */ + private final Map<Long, Long> totalWinningBidPricesPerSeller; + + private Instant lastTimestamp; + + public Simulator(NexmarkConfiguration configuration) { + super(new WinningBidsSimulator(configuration).results()); + numWinningBidsPerSeller = new TreeMap<>(); + totalWinningBidPricesPerSeller = new TreeMap<>(); + lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + /** + * Update the per-seller running counts/sums. + */ + private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) { + NexmarkUtils.info("winning auction, bid: %s, %s", auction, bid); + Long count = numWinningBidsPerSeller.get(auction.seller); + if (count == null) { + count = 1L; + } else { + count += 1; + } + numWinningBidsPerSeller.put(auction.seller, count); + Long total = totalWinningBidPricesPerSeller.get(auction.seller); + if (total == null) { + total = bid.price; + } else { + total += bid.price; + } + totalWinningBidPricesPerSeller.put(auction.seller, total); + TimestampedValue<SellerPrice> intermediateResult = TimestampedValue.of( + new SellerPrice(auction.seller, Math.round((double) total / count)), timestamp); + addIntermediateResult(intermediateResult); + } + + + @Override + protected void run() { + TimestampedValue<AuctionBid> timestampedWinningBid = nextInput(); + if (timestampedWinningBid == null) { + for (Map.Entry<Long, Long> entry : numWinningBidsPerSeller.entrySet()) { + long seller = entry.getKey(); + long count = entry.getValue(); + long total = totalWinningBidPricesPerSeller.get(seller); + addResult(TimestampedValue.of( + new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp)); + } + allDone(); + return; + } + + lastTimestamp = timestampedWinningBid.getTimestamp(); + captureWinningBid(timestampedWinningBid.getValue().auction, + timestampedWinningBid.getValue().bid, lastTimestamp); + } + } + + public Query6Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected Iterable<TimestampedValue<KnownSize>> relevantResults( + Iterable<TimestampedValue<KnownSize>> results) { + // Find the last (in processing time) reported average price for each seller. + Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>(); + for (TimestampedValue<KnownSize> obj : results) { + Assert.assertTrue("have SellerPrice", obj.getValue() instanceof SellerPrice); + SellerPrice sellerPrice = (SellerPrice) obj.getValue(); + finalAverages.put( + sellerPrice.seller, TimestampedValue.of((KnownSize) sellerPrice, obj.getTimestamp())); + } + return finalAverages.values(); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValue(itr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java new file mode 100644 index 0000000..71b75c3 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +import org.joda.time.Duration; + +/** + * Query 7, 'Highest Bid'. Select the bids with the highest bid + * price in the last minute. In CQL syntax: + * + * <pre> + * SELECT Rstream(B.auction, B.price, B.bidder) + * FROM Bid [RANGE 1 MINUTE SLIDE 1 MINUTE] B + * WHERE B.price = (SELECT MAX(B1.price) + * FROM BID [RANGE 1 MINUTE SLIDE 1 MINUTE] B1); + * </pre> + * + * <p>We will use a shorter window to help make testing easier. We'll also implement this using + * a side-input in order to exercise that functionality. (A combiner, as used in Query 5, is + * a more efficient approach.). + */ +public class Query7 extends NexmarkQuery { + public Query7(NexmarkConfiguration configuration) { + super(configuration, "Query7"); + } + + private PCollection<Bid> applyTyped(PCollection<Event> events) { + // Window the bids. + PCollection<Bid> slidingBids = events.apply(JUST_BIDS).apply( + Window.<Bid>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))); + + // Find the largest price in all bids. + // NOTE: It would be more efficient to write this query much as we did for Query5, using + // a binary combiner to accumulate the bids with maximal price. As written this query + // requires an additional scan per window, with the associated cost of snapshotted state and + // its I/O. We'll keep this implementation since it illustrates the use of side inputs. + final PCollectionView<Long> maxPriceView = + slidingBids + .apply("BidToPrice", BID_TO_PRICE) + .apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView()); + + return slidingBids + // Select all bids which have that maximum price (there may be more than one). + .apply(name + ".Select", ParDo + .of(new DoFn<Bid, Bid>() { + @ProcessElement + public void processElement(ProcessContext c) { + long maxPrice = c.sideInput(maxPriceView); + Bid bid = c.element(); + if (bid.price == maxPrice) { + c.output(bid); + } + } + }) + .withSideInputs(maxPriceView)); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java new file mode 100644 index 0000000..4011746 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A direct implementation of {@link Query7}. + */ +public class Query7Model extends NexmarkQueryModel implements Serializable { + /** + * Simulator for query 7. + */ + private class Simulator extends AbstractSimulator<Event, Bid> { + /** Bids with highest bid price seen in the current window. */ + private final List<Bid> highestBids; + + /** When current window started. */ + private Instant windowStart; + + private Instant lastTimestamp; + + public Simulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + highestBids = new ArrayList<>(); + windowStart = NexmarkUtils.BEGINNING_OF_TIME; + lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + /** + * Transfer the currently winning bids into results and retire them. + */ + private void retireWindow(Instant timestamp) { + for (Bid bid : highestBids) { + addResult(TimestampedValue.of(bid, timestamp)); + } + highestBids.clear(); + } + + /** + * Keep just the highest price bid. + */ + private void captureBid(Bid bid) { + Iterator<Bid> itr = highestBids.iterator(); + boolean isWinning = true; + while (itr.hasNext()) { + Bid existingBid = itr.next(); + if (existingBid.price > bid.price) { + isWinning = false; + break; + } + NexmarkUtils.info("smaller price: %s", existingBid); + itr.remove(); + } + if (isWinning) { + NexmarkUtils.info("larger price: %s", bid); + highestBids.add(bid); + } + } + + @Override + protected void run() { + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + // Capture all remaining bids in results. + retireWindow(lastTimestamp); + allDone(); + return; + } + + Event event = timestampedEvent.getValue(); + if (event.bid == null) { + // Ignore non-bid events. + return; + } + lastTimestamp = timestampedEvent.getTimestamp(); + Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec), + Duration.standardSeconds(configuration.windowSizeSec), lastTimestamp); + if (!newWindowStart.equals(windowStart)) { + // Capture highest priced bids in current window and retire it. + retireWindow(lastTimestamp); + windowStart = newWindowStart; + } + // Keep only the highest bids. + captureBid(event.bid); + } + } + + public Query7Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValueOrder(itr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java new file mode 100644 index 0000000..fa3dd86 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.IdNameReserve; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * Query 8, 'Monitor New Users'. Select people who have entered the system and created auctions + * in the last 12 hours, updated every 12 hours. In CQL syntax: + * + * <pre> + * SELECT Rstream(P.id, P.name, A.reserve) + * FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A + * WHERE P.id = A.seller; + * </pre> + * + * <p>To make things a bit more dynamic and easier to test we'll use a much shorter window. + */ +public class Query8 extends NexmarkQuery { + public Query8(NexmarkConfiguration configuration) { + super(configuration, "Query8"); + } + + private PCollection<IdNameReserve> applyTyped(PCollection<Event> events) { + // Window and key new people by their id. + PCollection<KV<Long, Person>> personsById = + events + .apply(JUST_NEW_PERSONS) + .apply("Query8.WindowPersons", + Window.<Person>into( + FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))) + .apply("PersonById", PERSON_BY_ID); + + // Window and key new auctions by their id. + PCollection<KV<Long, Auction>> auctionsBySeller = + events.apply(JUST_NEW_AUCTIONS) + .apply("Query8.WindowAuctions", + Window.<Auction>into( + FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))) + .apply("AuctionBySeller", AUCTION_BY_SELLER); + + // Join people and auctions and project the person id, name and auction reserve price. + return KeyedPCollectionTuple.of(PERSON_TAG, personsById) + .and(AUCTION_TAG, auctionsBySeller) + .apply(CoGroupByKey.<Long>create()) + .apply(name + ".Select", + ParDo.of(new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() { + @ProcessElement + public void processElement(ProcessContext c) { + Person person = c.element().getValue().getOnly(PERSON_TAG, null); + if (person == null) { + // Person was not created in last window period. + return; + } + for (Auction auction : c.element().getValue().getAll(AUCTION_TAG)) { + c.output(new IdNameReserve(person.id, person.name, auction.reserve)); + } + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java new file mode 100644 index 0000000..351cef7 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.IdNameReserve; +import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A direct implementation of {@link Query8}. + */ +public class Query8Model extends NexmarkQueryModel implements Serializable { + /** + * Simulator for query 8. + */ + private class Simulator extends AbstractSimulator<Event, IdNameReserve> { + /** New persons seen in the current window, indexed by id. */ + private final Map<Long, Person> newPersons; + + /** New auctions seen in the current window, indexed by seller id. */ + private final Multimap<Long, Auction> newAuctions; + + /** When did the current window start. */ + private Instant windowStart; + + public Simulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + newPersons = new HashMap<>(); + newAuctions = ArrayListMultimap.create(); + windowStart = NexmarkUtils.BEGINNING_OF_TIME; + } + + /** + * Retire all persons added in last window. + */ + private void retirePersons() { + for (Map.Entry<Long, Person> entry : newPersons.entrySet()) { + NexmarkUtils.info("retire: %s", entry.getValue()); + } + newPersons.clear(); + } + + /** + * Retire all auctions added in last window. + */ + private void retireAuctions() { + for (Map.Entry<Long, Auction> entry : newAuctions.entries()) { + NexmarkUtils.info("retire: %s", entry.getValue()); + } + newAuctions.clear(); + } + + /** + * Capture new result. + */ + private void addResult(Auction auction, Person person, Instant timestamp) { + addResult(TimestampedValue.of( + new IdNameReserve(person.id, person.name, auction.reserve), timestamp)); + } + + @Override + public void run() { + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + allDone(); + return; + } + + Event event = timestampedEvent.getValue(); + if (event.bid != null) { + // Ignore bid events. + // Keep looking for next events. + return; + } + Instant timestamp = timestampedEvent.getTimestamp(); + Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec), + Duration.standardSeconds(configuration.windowSizeSec), timestamp); + if (!newWindowStart.equals(windowStart)) { + // Retire this window. + retirePersons(); + retireAuctions(); + windowStart = newWindowStart; + } + + if (event.newAuction != null) { + // Join new auction with existing person, if any. + Person person = newPersons.get(event.newAuction.seller); + if (person != null) { + addResult(event.newAuction, person, timestamp); + } else { + // Remember auction for future new people. + newAuctions.put(event.newAuction.seller, event.newAuction); + } + } else { // event is not an auction, nor a bid, so it is a person + // Join new person with existing auctions. + for (Auction auction : newAuctions.get(event.newPerson.id)) { + addResult(auction, event.newPerson, timestamp); + } + // We'll never need these auctions again. + newAuctions.removeAll(event.newPerson.id); + // Remember person for future auctions. + newPersons.put(event.newPerson.id, event.newPerson); + } + } + } + + public Query8Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new Simulator(configuration); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValue(itr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java new file mode 100644 index 0000000..5f11e4e --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.AuctionBid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.values.PCollection; + +/** + * Query "9", 'Winning bids'. Select just the winning bids. Not in original NEXMark suite, but + * handy for testing. See {@link WinningBids} for the details. + */ +public class Query9 extends NexmarkQuery { + public Query9(NexmarkConfiguration configuration) { + super(configuration, "Query9"); + } + + private PCollection<AuctionBid> applyTyped(PCollection<Event> events) { + return events.apply(new WinningBids(name, configuration)); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java new file mode 100644 index 0000000..48d792e --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.values.TimestampedValue; + +/** + * A direct implementation of {@link Query9}. + */ +public class Query9Model extends NexmarkQueryModel implements Serializable { + public Query9Model(NexmarkConfiguration configuration) { + super(configuration); + } + + @Override + public AbstractSimulator<?, ?> simulator() { + return new WinningBidsSimulator(configuration); + } + + @Override + protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { + return toValue(itr); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java new file mode 100644 index 0000000..816a81f --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import static com.google.common.base.Preconditions.checkState; + +import com.fasterxml.jackson.annotation.JsonCreator; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.AuctionBid; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.sources.GeneratorConfig; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; + +/** + * A transform to find the winning bid for each closed auction. In pseudo CQL syntax: + * + * <pre>{@code + * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime) + * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] + * WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME + * GROUP BY A.id + * }</pre> + * + * <p>We will also check that the winning bid is above the auction reserve. Note that + * we ignore the auction opening bid value since it has no impact on which bid eventually wins, + * if any. + * + * <p>Our implementation will use a custom windowing function in order to bring bids and + * auctions together without requiring global state. + */ +public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> { + /** Windows for open auctions and bids. */ + private static class AuctionOrBidWindow extends IntervalWindow { + /** Id of auction this window is for. */ + public final long auction; + + /** + * True if this window represents an actual auction, and thus has a start/end + * time matching that of the auction. False if this window represents a bid, and + * thus has an unbounded start/end time. + */ + public final boolean isAuctionWindow; + + /** For avro only. */ + private AuctionOrBidWindow() { + super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE); + auction = 0; + isAuctionWindow = false; + } + + private AuctionOrBidWindow( + Instant start, Instant end, long auctionId, boolean isAuctionWindow) { + super(start, end); + this.auction = auctionId; + this.isAuctionWindow = isAuctionWindow; + } + + /** Return an auction window for {@code auction}. */ + public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) { + return new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true); + } + + /** + * Return a bid window for {@code bid}. It should later be merged into + * the corresponding auction window. However, it is possible this bid is for an already + * expired auction, or for an auction which the system has not yet seen. So we + * give the bid a bit of wiggle room in its interval. + */ + public static AuctionOrBidWindow forBid( + long expectedAuctionDurationMs, Instant timestamp, Bid bid) { + // At this point we don't know which auctions are still valid, and the bid may + // be for an auction which won't start until some unknown time in the future + // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid). + // A real system would atomically reconcile bids and auctions by a separate mechanism. + // If we give bids an unbounded window it is possible a bid for an auction which + // has already expired would cause the system watermark to stall, since that window + // would never be retired. + // Instead, we will just give the bid a finite window which expires at + // the upper bound of auctions assuming the auction starts at the same time as the bid, + // and assuming the system is running at its lowest event rate (as per interEventDelayUs). + return new AuctionOrBidWindow( + timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false); + } + + /** Is this an auction window? */ + public boolean isAuctionWindow() { + return isAuctionWindow; + } + + @Override + public String toString() { + return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}", + start(), end(), auction, isAuctionWindow); + } + + @Override public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + AuctionOrBidWindow that = (AuctionOrBidWindow) o; + return (isAuctionWindow == that.isAuctionWindow) && (auction == that.auction); + } + + @Override public int hashCode() { + return Objects.hash(isAuctionWindow, auction); + } + } + + /** + * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long. + */ + private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> { + private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder(); + private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder(); + private static final Coder<Long> ID_CODER = VarLongCoder.of(); + private static final Coder<Integer> INT_CODER = VarIntCoder.of(); + + @JsonCreator + public static AuctionOrBidWindowCoder of() { + return INSTANCE; + } + + @Override + public void encode(AuctionOrBidWindow window, OutputStream outStream) + throws IOException, CoderException { + SUPER_CODER.encode(window, outStream); + ID_CODER.encode(window.auction, outStream); + INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream); + } + + @Override + public AuctionOrBidWindow decode(InputStream inStream) + throws IOException, CoderException { + IntervalWindow superWindow = SUPER_CODER.decode(inStream); + long auction = ID_CODER.decode(inStream); + boolean isAuctionWindow = INT_CODER.decode(inStream) != 0; + return new AuctionOrBidWindow( + superWindow.start(), superWindow.end(), auction, isAuctionWindow); + } + + @Override public void verifyDeterministic() throws NonDeterministicException {} + } + + /** Assign events to auction windows and merges them intelligently. */ + private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> { + /** Expected duration of auctions in ms. */ + private final long expectedAuctionDurationMs; + + public AuctionOrBidWindowFn(long expectedAuctionDurationMs) { + this.expectedAuctionDurationMs = expectedAuctionDurationMs; + } + + @Override + public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) { + Event event = c.element(); + if (event.newAuction != null) { + // Assign auctions to an auction window which expires at the auction's close. + return Collections + .singletonList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction)); + } else if (event.bid != null) { + // Assign bids to a temporary bid window which will later be merged into the appropriate + // auction window. + return Collections.singletonList( + AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid)); + } else { + // Don't assign people to any window. They will thus be dropped. + return Collections.emptyList(); + } + } + + @Override + public void mergeWindows(MergeContext c) throws Exception { + // Split and index the auction and bid windows by auction id. + Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>(); + Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>(); + for (AuctionOrBidWindow window : c.windows()) { + if (window.isAuctionWindow()) { + idToTrueAuctionWindow.put(window.auction, window); + } else { + List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction); + if (bidWindows == null) { + bidWindows = new ArrayList<>(); + idToBidAuctionWindows.put(window.auction, bidWindows); + } + bidWindows.add(window); + } + } + + // Merge all 'bid' windows into their corresponding 'auction' window, provided the + // auction has not expired. + for (Map.Entry<Long, AuctionOrBidWindow> entry : idToTrueAuctionWindow.entrySet()) { + long auction = entry.getKey(); + AuctionOrBidWindow auctionWindow = entry.getValue(); + List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction); + if (bidWindows != null) { + List<AuctionOrBidWindow> toBeMerged = new ArrayList<>(); + for (AuctionOrBidWindow bidWindow : bidWindows) { + if (bidWindow.start().isBefore(auctionWindow.end())) { + toBeMerged.add(bidWindow); + } + // else: This bid window will remain until its expire time, at which point it + // will expire without ever contributing to an output. + } + if (!toBeMerged.isEmpty()) { + toBeMerged.add(auctionWindow); + c.merge(toBeMerged, auctionWindow); + } + } + } + } + + @Override + public boolean isCompatible(WindowFn<?, ?> other) { + return other instanceof AuctionOrBidWindowFn; + } + + @Override + public Coder<AuctionOrBidWindow> windowCoder() { + return AuctionOrBidWindowCoder.of(); + } + + @Override + public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() { + throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs"); + } + + /** + * Below we will GBK auctions and bids on their auction ids. Then we will reduce those + * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at + * least one valid bid. We would like those output pairs to have a timestamp of the auction's + * expiry (since that's the earliest we know for sure we have the correct winner). We would + * also like to make that winning results are available to following stages at the auction's + * expiry. + * + * <p>Each result of the GBK will have a timestamp of the min of the result of this object's + * assignOutputTime over all records which end up in one of its iterables. Thus we get the + * desired behavior if we ignore each record's timestamp and always return the auction window's + * 'maxTimestamp', which will correspond to the auction's expiry. + * + * <p>In contrast, if this object's assignOutputTime were to return 'inputTimestamp' + * (the usual implementation), then each GBK record will take as its timestamp the minimum of + * the timestamps of all bids and auctions within it, which will always be the auction's + * timestamp. An auction which expires well into the future would thus hold up the watermark + * of the GBK results until that auction expired. That in turn would hold up all winning pairs. + */ + @Override + public Instant getOutputTime( + Instant inputTimestamp, AuctionOrBidWindow window) { + return window.maxTimestamp(); + } + } + + private final AuctionOrBidWindowFn auctionOrBidWindowFn; + + public WinningBids(String name, NexmarkConfiguration configuration) { + super(name); + // What's the expected auction time (when the system is running at the lowest event rate). + long[] interEventDelayUs = configuration.rateShape.interEventDelayUs( + configuration.firstEventRate, configuration.nextEventRate, + configuration.rateUnit, configuration.numEventGenerators); + long longestDelayUs = 0; + for (long interEventDelayU : interEventDelayUs) { + longestDelayUs = Math.max(longestDelayUs, interEventDelayU); + } + // Adjust for proportion of auction events amongst all events. + longestDelayUs = + (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR) + / GeneratorConfig.AUCTION_PROPORTION; + // Adjust for number of in-flight auctions. + longestDelayUs = longestDelayUs * configuration.numInFlightAuctions; + long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000; + NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs); + auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs); + } + + @Override + public PCollection<AuctionBid> expand(PCollection<Event> events) { + // Window auctions and bids into custom auction windows. New people events will be discarded. + // This will allow us to bring bids and auctions together irrespective of how long + // each auction is open for. + events = events.apply("Window", Window.into(auctionOrBidWindowFn)); + + // Key auctions by their id. + PCollection<KV<Long, Auction>> auctionsById = + events.apply(NexmarkQuery.JUST_NEW_AUCTIONS) + .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID); + + // Key bids by their auction id. + PCollection<KV<Long, Bid>> bidsByAuctionId = + events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION); + + // Find the highest price valid bid for each closed auction. + return + // Join auctions and bids. + KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById) + .and(NexmarkQuery.BID_TAG, bidsByAuctionId) + .apply(CoGroupByKey.<Long>create()) + // Filter and select. + .apply(name + ".Join", + ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() { + private final Counter noAuctionCounter = Metrics.counter(name, "noAuction"); + private final Counter underReserveCounter = Metrics.counter(name, "underReserve"); + private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids"); + + @ProcessElement + public void processElement(ProcessContext c) { + Auction auction = + c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null); + if (auction == null) { + // We have bids without a matching auction. Give up. + noAuctionCounter.inc(); + return; + } + // Find the current winning bid for auction. + // The earliest bid with the maximum price above the reserve wins. + Bid bestBid = null; + for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) { + // Bids too late for their auction will have been + // filtered out by the window merge function. + checkState(bid.dateTime < auction.expires); + if (bid.price < auction.reserve) { + // Bid price is below auction reserve. + underReserveCounter.inc(); + continue; + } + + if (bestBid == null + || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) { + bestBid = bid; + } + } + if (bestBid == null) { + // We don't have any valid bids for auction. + noValidBidsCounter.inc(); + return; + } + c.output(new AuctionBid(auction, bestBid)); + } + } + )); + } + + @Override + public int hashCode() { + return Objects.hash(auctionOrBidWindowFn); + } + + @Override public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + WinningBids that = (WinningBids) o; + return auctionOrBidWindowFn.equals(that.auctionOrBidWindowFn); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java new file mode 100644 index 0000000..69b64c0 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import javax.annotation.Nullable; + +import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.NexmarkUtils; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.AuctionBid; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; + +/** + * A simulator of the {@code WinningBids} query. + */ +public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> { + /** Auctions currently still open, indexed by auction id. */ + private final Map<Long, Auction> openAuctions; + + /** The ids of auctions known to be closed. */ + private final Set<Long> closedAuctions; + + /** Current best valid bids for open auctions, indexed by auction id. */ + private final Map<Long, Bid> bestBids; + + /** Bids for auctions we havn't seen yet. */ + private final List<Bid> bidsWithoutAuctions; + + /** + * Timestamp of last new auction or bid event (ms since epoch). + */ + private long lastTimestamp; + + public WinningBidsSimulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + openAuctions = new TreeMap<>(); + closedAuctions = new TreeSet<>(); + bestBids = new TreeMap<>(); + bidsWithoutAuctions = new ArrayList<>(); + lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + } + + /** + * Try to account for {@code bid} in state. Return true if bid has now been + * accounted for by {@code bestBids}. + */ + private boolean captureBestBid(Bid bid, boolean shouldLog) { + if (closedAuctions.contains(bid.auction)) { + // Ignore bids for known, closed auctions. + if (shouldLog) { + NexmarkUtils.info("closed auction: %s", bid); + } + return true; + } + Auction auction = openAuctions.get(bid.auction); + if (auction == null) { + // We don't have an auction for this bid yet, so can't determine if it is + // winning or not. + if (shouldLog) { + NexmarkUtils.info("pending auction: %s", bid); + } + return false; + } + if (bid.price < auction.reserve) { + // Bid price is too low. + if (shouldLog) { + NexmarkUtils.info("below reserve: %s", bid); + } + return true; + } + Bid existingBid = bestBids.get(bid.auction); + if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) { + // We've found a (new) best bid for a known auction. + bestBids.put(bid.auction, bid); + if (shouldLog) { + NexmarkUtils.info("new winning bid: %s", bid); + } + } else { + if (shouldLog) { + NexmarkUtils.info("ignoring low bid: %s", bid); + } + } + return true; + } + + /** + * Try to match bids without auctions to auctions. + */ + private void flushBidsWithoutAuctions() { + Iterator<Bid> itr = bidsWithoutAuctions.iterator(); + while (itr.hasNext()) { + Bid bid = itr.next(); + if (captureBestBid(bid, false)) { + NexmarkUtils.info("bid now accounted for: %s", bid); + itr.remove(); + } + } + } + + /** + * Return the next winning bid for an expired auction relative to {@code timestamp}. + * Return null if no more winning bids, in which case all expired auctions will + * have been removed from our state. Retire auctions in order of expire time. + */ + @Nullable + private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) { + Map<Long, List<Long>> toBeRetired = new TreeMap<>(); + for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) { + if (entry.getValue().expires <= timestamp) { + List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires); + if (idsAtTime == null) { + idsAtTime = new ArrayList<>(); + toBeRetired.put(entry.getValue().expires, idsAtTime); + } + idsAtTime.add(entry.getKey()); + } + } + for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) { + for (long id : entry.getValue()) { + Auction auction = openAuctions.get(id); + NexmarkUtils.info("retiring auction: %s", auction); + openAuctions.remove(id); + Bid bestBid = bestBids.get(id); + if (bestBid != null) { + TimestampedValue<AuctionBid> result = + TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires)); + NexmarkUtils.info("winning: %s", result); + return result; + } + } + } + return null; + } + + @Override + protected void run() { + if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) { + // We may have finally seen the auction a bid was intended for. + flushBidsWithoutAuctions(); + TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp); + if (result != null) { + addResult(result); + return; + } + } + + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + // No more events. Flush any still open auctions. + TimestampedValue<AuctionBid> result = + nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + if (result == null) { + // We are done. + allDone(); + return; + } + addResult(result); + return; + } + + Event event = timestampedEvent.getValue(); + if (event.newPerson != null) { + // Ignore new person events. + return; + } + + lastTimestamp = timestampedEvent.getTimestamp().getMillis(); + if (event.newAuction != null) { + // Add this new open auction to our state. + openAuctions.put(event.newAuction.id, event.newAuction); + } else { + if (!captureBestBid(event.bid, true)) { + // We don't know what to do with this bid yet. + NexmarkUtils.info("bid not yet accounted for: %s", event.bid); + bidsWithoutAuctions.add(event.bid); + } + } + // Keep looking for winning bids. + } +}
