http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java deleted file mode 100644 index 9c0fe6d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import org.apache.beam.integration.nexmark.Monitor; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.AuctionBid; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.CategoryPrice; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Mean; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; - -/** - * Query 4, 'Average Price for a Category'. Select the average of the wining bid prices for all - * closed auctions in each category. In CQL syntax: - * - * <pre>{@code - * SELECT Istream(AVG(Q.final)) - * FROM Category C, (SELECT Rstream(MAX(B.price) AS final, A.category) - * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] - * WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME - * GROUP BY A.id, A.category) Q - * WHERE Q.category = C.id - * GROUP BY C.id; - * }</pre> - * - * <p>For extra spiciness our implementation differs slightly from the above: - * <ul> - * <li>We select both the average winning price and the category. - * <li>We don't bother joining with a static category table, since it's contents are never used. - * <li>We only consider bids which are above the auction's reserve price. - * <li>We accept the highest-price, earliest valid bid as the winner. - * <li>We calculate the averages oven a sliding window of size {@code windowSizeSec} and - * period {@code windowPeriodSec}. - * </ul> - */ -public class Query4 extends NexmarkQuery { - private final Monitor<AuctionBid> winningBidsMonitor; - - public Query4(NexmarkConfiguration configuration) { - super(configuration, "Query4"); - winningBidsMonitor = new Monitor<>(name + ".WinningBids", "winning"); - } - - private PCollection<CategoryPrice> applyTyped(PCollection<Event> events) { - PCollection<AuctionBid> winningBids = - events - // Find the winning bid for each closed auction. - .apply(new WinningBids(name + ".WinningBids", configuration)); - - // Monitor winning bids - winningBids = winningBids.apply(name + ".WinningBidsMonitor", - winningBidsMonitor.getTransform()); - - return winningBids - // Key the winning bid price by the auction category. - .apply(name + ".Rekey", - ParDo.of(new DoFn<AuctionBid, KV<Long, Long>>() { - @ProcessElement - public void processElement(ProcessContext c) { - Auction auction = c.element().auction; - Bid bid = c.element().bid; - c.output(KV.of(auction.category, bid.price)); - } - })) - - // Re-window so we can calculate a sliding average - .apply(Window.<KV<Long, Long>>into( - SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec)) - .every(Duration.standardSeconds(configuration.windowPeriodSec)))) - - // Find the average of the winning bids for each category. - // Make sure we share the work for each category between workers. - .apply(Mean.<Long, Long>perKey().withHotKeyFanout(configuration.fanout)) - - // For testing against Query4Model, capture which results are 'final'. - .apply(name + ".Project", - ParDo.of(new DoFn<KV<Long, Double>, CategoryPrice>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(new CategoryPrice(c.element().getKey(), - Math.round(c.element().getValue()), c.pane().isLast())); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java deleted file mode 100644 index 269e47a..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.AuctionBid; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.CategoryPrice; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Assert; - -/** - * A direct implementation of {@link Query4}. - */ -public class Query4Model extends NexmarkQueryModel implements Serializable { - /** - * Simulator for query 4. - */ - private class Simulator extends AbstractSimulator<AuctionBid, CategoryPrice> { - /** The prices and categories for all winning bids in the last window size. */ - private final List<TimestampedValue<CategoryPrice>> winningPricesByCategory; - - /** Timestamp of last result (ms since epoch). */ - private Instant lastTimestamp; - - /** When oldest active window starts. */ - private Instant windowStart; - - /** The last seen result for each category. */ - private final Map<Long, TimestampedValue<CategoryPrice>> lastSeenResults; - - public Simulator(NexmarkConfiguration configuration) { - super(new WinningBidsSimulator(configuration).results()); - winningPricesByCategory = new ArrayList<>(); - lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - windowStart = NexmarkUtils.BEGINNING_OF_TIME; - lastSeenResults = new TreeMap<>(); - } - - /** - * Calculate the average bid price for each category for all winning bids - * which are strictly before {@code end}. - */ - private void averages(Instant end) { - Map<Long, Long> counts = new TreeMap<>(); - Map<Long, Long> totals = new TreeMap<>(); - for (TimestampedValue<CategoryPrice> value : winningPricesByCategory) { - if (!value.getTimestamp().isBefore(end)) { - continue; - } - long category = value.getValue().category; - long price = value.getValue().price; - Long count = counts.get(category); - if (count == null) { - count = 1L; - } else { - count += 1; - } - counts.put(category, count); - Long total = totals.get(category); - if (total == null) { - total = price; - } else { - total += price; - } - totals.put(category, total); - } - for (Map.Entry<Long, Long> entry : counts.entrySet()) { - long category = entry.getKey(); - long count = entry.getValue(); - long total = totals.get(category); - TimestampedValue<CategoryPrice> result = TimestampedValue.of( - new CategoryPrice(category, Math.round((double) total / count), true), lastTimestamp); - addIntermediateResult(result); - lastSeenResults.put(category, result); - } - } - - /** - * Calculate averages for any windows which can now be retired. Also prune entries - * which can no longer contribute to any future window. - */ - private void prune(Instant newWindowStart) { - while (!newWindowStart.equals(windowStart)) { - averages(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec))); - windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec)); - Iterator<TimestampedValue<CategoryPrice>> itr = winningPricesByCategory.iterator(); - while (itr.hasNext()) { - if (itr.next().getTimestamp().isBefore(windowStart)) { - itr.remove(); - } - } - if (winningPricesByCategory.isEmpty()) { - windowStart = newWindowStart; - } - } - } - - /** - * Capture the winning bid. - */ - private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) { - winningPricesByCategory.add( - TimestampedValue.of(new CategoryPrice(auction.category, bid.price, false), timestamp)); - } - - @Override - protected void run() { - TimestampedValue<AuctionBid> timestampedWinningBid = nextInput(); - if (timestampedWinningBid == null) { - prune(NexmarkUtils.END_OF_TIME); - for (TimestampedValue<CategoryPrice> result : lastSeenResults.values()) { - addResult(result); - } - allDone(); - return; - } - lastTimestamp = timestampedWinningBid.getTimestamp(); - Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec), - Duration.standardSeconds(configuration.windowPeriodSec), lastTimestamp); - prune(newWindowStart); - captureWinningBid(timestampedWinningBid.getValue().auction, - timestampedWinningBid.getValue().bid, lastTimestamp); - } - } - - public Query4Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected Iterable<TimestampedValue<KnownSize>> relevantResults( - Iterable<TimestampedValue<KnownSize>> results) { - // Find the last (in processing time) reported average price for each category. - Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>(); - for (TimestampedValue<KnownSize> obj : results) { - Assert.assertTrue("have CategoryPrice", obj.getValue() instanceof CategoryPrice); - CategoryPrice categoryPrice = (CategoryPrice) obj.getValue(); - if (categoryPrice.isLast) { - finalAverages.put( - categoryPrice.category, - TimestampedValue.of((KnownSize) categoryPrice, obj.getTimestamp())); - } - } - - return finalAverages.values(); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValue(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java deleted file mode 100644 index bdf3e5f..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.AuctionCount; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; - -/** - * Query 5, 'Hot Items'. Which auctions have seen the most bids in the last hour (updated every - * minute). In CQL syntax: - * - * <pre>{@code - * SELECT Rstream(auction) - * FROM (SELECT B1.auction, count(*) AS num - * FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1 - * GROUP BY B1.auction) - * WHERE num >= ALL (SELECT count(*) - * FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2 - * GROUP BY B2.auction); - * }</pre> - * - * <p>To make things a bit more dynamic and easier to test we use much shorter windows, and - * we'll also preserve the bid counts. - */ -public class Query5 extends NexmarkQuery { - public Query5(NexmarkConfiguration configuration) { - super(configuration, "Query5"); - } - - private PCollection<AuctionCount> applyTyped(PCollection<Event> events) { - return events - // Only want the bid events. - .apply(JUST_BIDS) - // Window the bids into sliding windows. - .apply( - Window.<Bid>into( - SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec)) - .every(Duration.standardSeconds(configuration.windowPeriodSec)))) - // Project just the auction id. - .apply("BidToAuction", BID_TO_AUCTION) - - // Count the number of bids per auction id. - .apply(Count.<Long>perElement()) - - // We'll want to keep all auctions with the maximal number of bids. - // Start by lifting each into a singleton list. - // need to do so because bellow combine returns a list of auctions in the key in case of - // equal number of bids. Combine needs to have same input type and return type. - .apply( - name + ".ToSingletons", - ParDo.of( - new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output( - KV.of( - Collections.singletonList(c.element().getKey()), - c.element().getValue())); - } - })) - - // Keep only the auction ids with the most bids. - .apply( - Combine.globally( - new Combine.BinaryCombineFn<KV<List<Long>, Long>>() { - @Override - public KV<List<Long>, Long> apply( - KV<List<Long>, Long> left, KV<List<Long>, Long> right) { - List<Long> leftBestAuctions = left.getKey(); - long leftCount = left.getValue(); - List<Long> rightBestAuctions = right.getKey(); - long rightCount = right.getValue(); - if (leftCount > rightCount) { - return left; - } else if (leftCount < rightCount) { - return right; - } else { - List<Long> newBestAuctions = new ArrayList<>(); - newBestAuctions.addAll(leftBestAuctions); - newBestAuctions.addAll(rightBestAuctions); - return KV.of(newBestAuctions, leftCount); - } - } - }) - .withoutDefaults() - .withFanout(configuration.fanout)) - - // Project into result. - .apply( - name + ".Select", - ParDo.of( - new DoFn<KV<List<Long>, Long>, AuctionCount>() { - @ProcessElement - public void processElement(ProcessContext c) { - long count = c.element().getValue(); - for (long auction : c.element().getKey()) { - c.output(new AuctionCount(auction, count)); - } - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java deleted file mode 100644 index 24d9a00..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.AuctionCount; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * A direct implementation of {@link Query5}. - */ -public class Query5Model extends NexmarkQueryModel implements Serializable { - /** - * Simulator for query 5. - */ - private class Simulator extends AbstractSimulator<Event, AuctionCount> { - /** Time of bids still contributing to open windows, indexed by their auction id. */ - private final Map<Long, List<Instant>> bids; - - /** When oldest active window starts. */ - private Instant windowStart; - - public Simulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - bids = new TreeMap<>(); - windowStart = NexmarkUtils.BEGINNING_OF_TIME; - } - - /** - * Count bids per auction id for bids strictly before {@code end}. Add the auction ids with - * the maximum number of bids to results. - */ - private void countBids(Instant end) { - Map<Long, Long> counts = new TreeMap<>(); - long maxCount = 0L; - for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) { - long count = 0L; - long auction = entry.getKey(); - for (Instant bid : entry.getValue()) { - if (bid.isBefore(end)) { - count++; - } - } - if (count > 0) { - counts.put(auction, count); - maxCount = Math.max(maxCount, count); - } - } - for (Map.Entry<Long, Long> entry : counts.entrySet()) { - long auction = entry.getKey(); - long count = entry.getValue(); - if (count == maxCount) { - AuctionCount result = new AuctionCount(auction, count); - addResult(TimestampedValue.of(result, end)); - } - } - } - - /** - * Retire bids which are strictly before {@code cutoff}. Return true if there are any bids - * remaining. - */ - private boolean retireBids(Instant cutoff) { - boolean anyRemain = false; - for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) { - long auction = entry.getKey(); - Iterator<Instant> itr = entry.getValue().iterator(); - while (itr.hasNext()) { - Instant bid = itr.next(); - if (bid.isBefore(cutoff)) { - NexmarkUtils.info("retire: %s for %s", bid, auction); - itr.remove(); - } else { - anyRemain = true; - } - } - } - return anyRemain; - } - - /** - * Retire active windows until we've reached {@code newWindowStart}. - */ - private void retireWindows(Instant newWindowStart) { - while (!newWindowStart.equals(windowStart)) { - NexmarkUtils.info("retiring window %s, aiming for %s", windowStart, newWindowStart); - // Count bids in the window (windowStart, windowStart + size]. - countBids(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec))); - // Advance the window. - windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec)); - // Retire bids which will never contribute to a future window. - if (!retireBids(windowStart)) { - // Can fast forward to latest window since no more outstanding bids. - windowStart = newWindowStart; - } - } - } - - /** - * Add bid to state. - */ - private void captureBid(Bid bid, Instant timestamp) { - List<Instant> existing = bids.get(bid.auction); - if (existing == null) { - existing = new ArrayList<>(); - bids.put(bid.auction, existing); - } - existing.add(timestamp); - } - - @Override - public void run() { - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - // Drain the remaining windows. - retireWindows(NexmarkUtils.END_OF_TIME); - allDone(); - return; - } - - Event event = timestampedEvent.getValue(); - if (event.bid == null) { - // Ignore non-bid events. - return; - } - Instant timestamp = timestampedEvent.getTimestamp(); - Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec), - Duration.standardSeconds(configuration.windowPeriodSec), timestamp); - // Capture results from any windows we can now retire. - retireWindows(newWindowStart); - // Capture current bid. - captureBid(event.bid, timestamp); - } - } - - public Query5Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValue(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java deleted file mode 100644 index ea39ede..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.AuctionBid; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.integration.nexmark.model.SellerPrice; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; - -/** - * Query 6, 'Average Selling Price by Seller'. Select the average selling price over the - * last 10 closed auctions by the same seller. In CQL syntax: - * - * <pre>{@code - * SELECT Istream(AVG(Q.final), Q.seller) - * FROM (SELECT Rstream(MAX(B.price) AS final, A.seller) - * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] - * WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME - * GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q - * GROUP BY Q.seller; - * }</pre> - * - * <p>We are a little more exact with selecting winning bids: see {@link WinningBids}. - */ -public class Query6 extends NexmarkQuery { - /** - * Combiner to keep track of up to {@code maxNumBids} of the most recent wining bids and calculate - * their average selling price. - */ - private static class MovingMeanSellingPrice extends Combine.CombineFn<Bid, List<Bid>, Long> { - private final int maxNumBids; - - public MovingMeanSellingPrice(int maxNumBids) { - this.maxNumBids = maxNumBids; - } - - @Override - public List<Bid> createAccumulator() { - return new ArrayList<>(); - } - - @Override - public List<Bid> addInput(List<Bid> accumulator, Bid input) { - accumulator.add(input); - Collections.sort(accumulator, Bid.ASCENDING_TIME_THEN_PRICE); - if (accumulator.size() > maxNumBids) { - accumulator.remove(0); - } - return accumulator; - } - - @Override - public List<Bid> mergeAccumulators(Iterable<List<Bid>> accumulators) { - List<Bid> result = new ArrayList<>(); - for (List<Bid> accumulator : accumulators) { - result.addAll(accumulator); - } - Collections.sort(result, Bid.ASCENDING_TIME_THEN_PRICE); - if (result.size() > maxNumBids) { - result = Lists.newArrayList(result.listIterator(result.size() - maxNumBids)); - } - return result; - } - - @Override - public Long extractOutput(List<Bid> accumulator) { - if (accumulator.isEmpty()) { - return 0L; - } - long sumOfPrice = 0; - for (Bid bid : accumulator) { - sumOfPrice += bid.price; - } - return Math.round((double) sumOfPrice / accumulator.size()); - } - } - - public Query6(NexmarkConfiguration configuration) { - super(configuration, "Query6"); - } - - private PCollection<SellerPrice> applyTyped(PCollection<Event> events) { - return events - // Find the winning bid for each closed auction. - .apply(new WinningBids(name + ".WinningBids", configuration)) - - // Key the winning bid by the seller id. - .apply(name + ".Rekey", - ParDo.of(new DoFn<AuctionBid, KV<Long, Bid>>() { - @ProcessElement - public void processElement(ProcessContext c) { - Auction auction = c.element().auction; - Bid bid = c.element().bid; - c.output(KV.of(auction.seller, bid)); - } - })) - - // Re-window to update on every wining bid. - .apply( - Window.<KV<Long, Bid>>into(new GlobalWindows()) - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) - .accumulatingFiredPanes() - .withAllowedLateness(Duration.ZERO)) - - // Find the average of last 10 winning bids for each seller. - .apply(Combine.<Long, Bid, Long>perKey(new MovingMeanSellingPrice(10))) - - // Project into our datatype. - .apply(name + ".Select", - ParDo.of(new DoFn<KV<Long, Long>, SellerPrice>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(new SellerPrice(c.element().getKey(), c.element().getValue())); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java deleted file mode 100644 index 9cb8b3d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeMap; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.AuctionBid; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.integration.nexmark.model.SellerPrice; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; -import org.junit.Assert; - -/** - * A direct implementation of {@link Query6}. - */ -public class Query6Model extends NexmarkQueryModel implements Serializable { - /** - * Simulator for query 6. - */ - private static class Simulator extends AbstractSimulator<AuctionBid, SellerPrice> { - /** The cumulative count of winning bids, indexed by seller id. */ - private final Map<Long, Long> numWinningBidsPerSeller; - - /** The cumulative total of winning bid prices, indexed by seller id. */ - private final Map<Long, Long> totalWinningBidPricesPerSeller; - - private Instant lastTimestamp; - - public Simulator(NexmarkConfiguration configuration) { - super(new WinningBidsSimulator(configuration).results()); - numWinningBidsPerSeller = new TreeMap<>(); - totalWinningBidPricesPerSeller = new TreeMap<>(); - lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - /** - * Update the per-seller running counts/sums. - */ - private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) { - NexmarkUtils.info("winning auction, bid: %s, %s", auction, bid); - Long count = numWinningBidsPerSeller.get(auction.seller); - if (count == null) { - count = 1L; - } else { - count += 1; - } - numWinningBidsPerSeller.put(auction.seller, count); - Long total = totalWinningBidPricesPerSeller.get(auction.seller); - if (total == null) { - total = bid.price; - } else { - total += bid.price; - } - totalWinningBidPricesPerSeller.put(auction.seller, total); - TimestampedValue<SellerPrice> intermediateResult = TimestampedValue.of( - new SellerPrice(auction.seller, Math.round((double) total / count)), timestamp); - addIntermediateResult(intermediateResult); - } - - - @Override - protected void run() { - TimestampedValue<AuctionBid> timestampedWinningBid = nextInput(); - if (timestampedWinningBid == null) { - for (Map.Entry<Long, Long> entry : numWinningBidsPerSeller.entrySet()) { - long seller = entry.getKey(); - long count = entry.getValue(); - long total = totalWinningBidPricesPerSeller.get(seller); - addResult(TimestampedValue.of( - new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp)); - } - allDone(); - return; - } - - lastTimestamp = timestampedWinningBid.getTimestamp(); - captureWinningBid(timestampedWinningBid.getValue().auction, - timestampedWinningBid.getValue().bid, lastTimestamp); - } - } - - public Query6Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected Iterable<TimestampedValue<KnownSize>> relevantResults( - Iterable<TimestampedValue<KnownSize>> results) { - // Find the last (in processing time) reported average price for each seller. - Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>(); - for (TimestampedValue<KnownSize> obj : results) { - Assert.assertTrue("have SellerPrice", obj.getValue() instanceof SellerPrice); - SellerPrice sellerPrice = (SellerPrice) obj.getValue(); - finalAverages.put( - sellerPrice.seller, TimestampedValue.of((KnownSize) sellerPrice, obj.getTimestamp())); - } - return finalAverages.values(); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValue(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java deleted file mode 100644 index 217d0d4..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; - -import org.joda.time.Duration; - -/** - * Query 7, 'Highest Bid'. Select the bids with the highest bid - * price in the last minute. In CQL syntax: - * - * <pre> - * SELECT Rstream(B.auction, B.price, B.bidder) - * FROM Bid [RANGE 1 MINUTE SLIDE 1 MINUTE] B - * WHERE B.price = (SELECT MAX(B1.price) - * FROM BID [RANGE 1 MINUTE SLIDE 1 MINUTE] B1); - * </pre> - * - * <p>We will use a shorter window to help make testing easier. We'll also implement this using - * a side-input in order to exercise that functionality. (A combiner, as used in Query 5, is - * a more efficient approach.). - */ -public class Query7 extends NexmarkQuery { - public Query7(NexmarkConfiguration configuration) { - super(configuration, "Query7"); - } - - private PCollection<Bid> applyTyped(PCollection<Event> events) { - // Window the bids. - PCollection<Bid> slidingBids = events.apply(JUST_BIDS).apply( - Window.<Bid>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))); - - // Find the largest price in all bids. - // NOTE: It would be more efficient to write this query much as we did for Query5, using - // a binary combiner to accumulate the bids with maximal price. As written this query - // requires an additional scan per window, with the associated cost of snapshotted state and - // its I/O. We'll keep this implementation since it illustrates the use of side inputs. - final PCollectionView<Long> maxPriceView = - slidingBids - .apply("BidToPrice", BID_TO_PRICE) - .apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView()); - - return slidingBids - // Select all bids which have that maximum price (there may be more than one). - .apply(name + ".Select", ParDo - .of(new DoFn<Bid, Bid>() { - @ProcessElement - public void processElement(ProcessContext c) { - long maxPrice = c.sideInput(maxPriceView); - Bid bid = c.element(); - if (bid.price == maxPrice) { - c.output(bid); - } - } - }) - .withSideInputs(maxPriceView)); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java deleted file mode 100644 index 0ada5e8..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * A direct implementation of {@link Query7}. - */ -public class Query7Model extends NexmarkQueryModel implements Serializable { - /** - * Simulator for query 7. - */ - private class Simulator extends AbstractSimulator<Event, Bid> { - /** Bids with highest bid price seen in the current window. */ - private final List<Bid> highestBids; - - /** When current window started. */ - private Instant windowStart; - - private Instant lastTimestamp; - - public Simulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - highestBids = new ArrayList<>(); - windowStart = NexmarkUtils.BEGINNING_OF_TIME; - lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - /** - * Transfer the currently winning bids into results and retire them. - */ - private void retireWindow(Instant timestamp) { - for (Bid bid : highestBids) { - addResult(TimestampedValue.of(bid, timestamp)); - } - highestBids.clear(); - } - - /** - * Keep just the highest price bid. - */ - private void captureBid(Bid bid) { - Iterator<Bid> itr = highestBids.iterator(); - boolean isWinning = true; - while (itr.hasNext()) { - Bid existingBid = itr.next(); - if (existingBid.price > bid.price) { - isWinning = false; - break; - } - NexmarkUtils.info("smaller price: %s", existingBid); - itr.remove(); - } - if (isWinning) { - NexmarkUtils.info("larger price: %s", bid); - highestBids.add(bid); - } - } - - @Override - protected void run() { - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - // Capture all remaining bids in results. - retireWindow(lastTimestamp); - allDone(); - return; - } - - Event event = timestampedEvent.getValue(); - if (event.bid == null) { - // Ignore non-bid events. - return; - } - lastTimestamp = timestampedEvent.getTimestamp(); - Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec), - Duration.standardSeconds(configuration.windowSizeSec), lastTimestamp); - if (!newWindowStart.equals(windowStart)) { - // Capture highest priced bids in current window and retire it. - retireWindow(lastTimestamp); - windowStart = newWindowStart; - } - // Keep only the highest bids. - captureBid(event.bid); - } - } - - public Query7Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValueOrder(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java deleted file mode 100644 index 603841b..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.IdNameReserve; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; - -/** - * Query 8, 'Monitor New Users'. Select people who have entered the system and created auctions - * in the last 12 hours, updated every 12 hours. In CQL syntax: - * - * <pre> - * SELECT Rstream(P.id, P.name, A.reserve) - * FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A - * WHERE P.id = A.seller; - * </pre> - * - * <p>To make things a bit more dynamic and easier to test we'll use a much shorter window. - */ -public class Query8 extends NexmarkQuery { - public Query8(NexmarkConfiguration configuration) { - super(configuration, "Query8"); - } - - private PCollection<IdNameReserve> applyTyped(PCollection<Event> events) { - // Window and key new people by their id. - PCollection<KV<Long, Person>> personsById = - events - .apply(JUST_NEW_PERSONS) - .apply("Query8.WindowPersons", - Window.<Person>into( - FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))) - .apply("PersonById", PERSON_BY_ID); - - // Window and key new auctions by their id. - PCollection<KV<Long, Auction>> auctionsBySeller = - events.apply(JUST_NEW_AUCTIONS) - .apply("Query8.WindowAuctions", - Window.<Auction>into( - FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))) - .apply("AuctionBySeller", AUCTION_BY_SELLER); - - // Join people and auctions and project the person id, name and auction reserve price. - return KeyedPCollectionTuple.of(PERSON_TAG, personsById) - .and(AUCTION_TAG, auctionsBySeller) - .apply(CoGroupByKey.<Long>create()) - .apply(name + ".Select", - ParDo.of(new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() { - @ProcessElement - public void processElement(ProcessContext c) { - Person person = c.element().getValue().getOnly(PERSON_TAG, null); - if (person == null) { - // Person was not created in last window period. - return; - } - for (Auction auction : c.element().getValue().getAll(AUCTION_TAG)) { - c.output(new IdNameReserve(person.id, person.name, auction.reserve)); - } - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java deleted file mode 100644 index 8c76bc6..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; -import java.io.Serializable; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.IdNameReserve; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * A direct implementation of {@link Query8}. - */ -public class Query8Model extends NexmarkQueryModel implements Serializable { - /** - * Simulator for query 8. - */ - private class Simulator extends AbstractSimulator<Event, IdNameReserve> { - /** New persons seen in the current window, indexed by id. */ - private final Map<Long, Person> newPersons; - - /** New auctions seen in the current window, indexed by seller id. */ - private final Multimap<Long, Auction> newAuctions; - - /** When did the current window start. */ - private Instant windowStart; - - public Simulator(NexmarkConfiguration configuration) { - super(NexmarkUtils.standardEventIterator(configuration)); - newPersons = new HashMap<>(); - newAuctions = ArrayListMultimap.create(); - windowStart = NexmarkUtils.BEGINNING_OF_TIME; - } - - /** - * Retire all persons added in last window. - */ - private void retirePersons() { - for (Map.Entry<Long, Person> entry : newPersons.entrySet()) { - NexmarkUtils.info("retire: %s", entry.getValue()); - } - newPersons.clear(); - } - - /** - * Retire all auctions added in last window. - */ - private void retireAuctions() { - for (Map.Entry<Long, Auction> entry : newAuctions.entries()) { - NexmarkUtils.info("retire: %s", entry.getValue()); - } - newAuctions.clear(); - } - - /** - * Capture new result. - */ - private void addResult(Auction auction, Person person, Instant timestamp) { - addResult(TimestampedValue.of( - new IdNameReserve(person.id, person.name, auction.reserve), timestamp)); - } - - @Override - public void run() { - TimestampedValue<Event> timestampedEvent = nextInput(); - if (timestampedEvent == null) { - allDone(); - return; - } - - Event event = timestampedEvent.getValue(); - if (event.bid != null) { - // Ignore bid events. - // Keep looking for next events. - return; - } - Instant timestamp = timestampedEvent.getTimestamp(); - Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec), - Duration.standardSeconds(configuration.windowSizeSec), timestamp); - if (!newWindowStart.equals(windowStart)) { - // Retire this window. - retirePersons(); - retireAuctions(); - windowStart = newWindowStart; - } - - if (event.newAuction != null) { - // Join new auction with existing person, if any. - Person person = newPersons.get(event.newAuction.seller); - if (person != null) { - addResult(event.newAuction, person, timestamp); - } else { - // Remember auction for future new people. - newAuctions.put(event.newAuction.seller, event.newAuction); - } - } else { // event is not an auction, nor a bid, so it is a person - // Join new person with existing auctions. - for (Auction auction : newAuctions.get(event.newPerson.id)) { - addResult(auction, event.newPerson, timestamp); - } - // We'll never need these auctions again. - newAuctions.removeAll(event.newPerson.id); - // Remember person for future auctions. - newPersons.put(event.newPerson.id, event.newPerson); - } - } - } - - public Query8Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new Simulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValue(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java deleted file mode 100644 index 6dd189d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.AuctionBid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.values.PCollection; - -/** - * Query "9", 'Winning bids'. Select just the winning bids. Not in original NEXMark suite, but - * handy for testing. See {@link WinningBids} for the details. - */ -public class Query9 extends NexmarkQuery { - public Query9(NexmarkConfiguration configuration) { - super(configuration, "Query9"); - } - - private PCollection<AuctionBid> applyTyped(PCollection<Event> events) { - return events.apply(new WinningBids(name, configuration)); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java deleted file mode 100644 index d117e2d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; - -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.sdk.values.TimestampedValue; - -/** - * A direct implementation of {@link Query9}. - */ -public class Query9Model extends NexmarkQueryModel implements Serializable { - public Query9Model(NexmarkConfiguration configuration) { - super(configuration); - } - - @Override - public AbstractSimulator<?, ?> simulator() { - return new WinningBidsSimulator(configuration); - } - - @Override - protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) { - return toValue(itr); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java deleted file mode 100644 index d4ca177..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java +++ /dev/null @@ -1,412 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark.queries; - -import static com.google.common.base.Preconditions.checkState; - -import com.fasterxml.jackson.annotation.JsonCreator; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.TreeMap; -import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.AuctionBid; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.sources.GeneratorConfig; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Instant; - -/** - * A transform to find the winning bid for each closed auction. In pseudo CQL syntax: - * - * <pre>{@code - * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime) - * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] - * WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME - * GROUP BY A.id - * }</pre> - * - * <p>We will also check that the winning bid is above the auction reserve. Note that - * we ignore the auction opening bid value since it has no impact on which bid eventually wins, - * if any. - * - * <p>Our implementation will use a custom windowing function in order to bring bids and - * auctions together without requiring global state. - */ -public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> { - /** Windows for open auctions and bids. */ - private static class AuctionOrBidWindow extends IntervalWindow { - /** Id of auction this window is for. */ - public final long auction; - - /** - * True if this window represents an actual auction, and thus has a start/end - * time matching that of the auction. False if this window represents a bid, and - * thus has an unbounded start/end time. - */ - public final boolean isAuctionWindow; - - /** For avro only. */ - private AuctionOrBidWindow() { - super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE); - auction = 0; - isAuctionWindow = false; - } - - private AuctionOrBidWindow( - Instant start, Instant end, long auctionId, boolean isAuctionWindow) { - super(start, end); - this.auction = auctionId; - this.isAuctionWindow = isAuctionWindow; - } - - /** Return an auction window for {@code auction}. */ - public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) { - return new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true); - } - - /** - * Return a bid window for {@code bid}. It should later be merged into - * the corresponding auction window. However, it is possible this bid is for an already - * expired auction, or for an auction which the system has not yet seen. So we - * give the bid a bit of wiggle room in its interval. - */ - public static AuctionOrBidWindow forBid( - long expectedAuctionDurationMs, Instant timestamp, Bid bid) { - // At this point we don't know which auctions are still valid, and the bid may - // be for an auction which won't start until some unknown time in the future - // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid). - // A real system would atomically reconcile bids and auctions by a separate mechanism. - // If we give bids an unbounded window it is possible a bid for an auction which - // has already expired would cause the system watermark to stall, since that window - // would never be retired. - // Instead, we will just give the bid a finite window which expires at - // the upper bound of auctions assuming the auction starts at the same time as the bid, - // and assuming the system is running at its lowest event rate (as per interEventDelayUs). - return new AuctionOrBidWindow( - timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false); - } - - /** Is this an auction window? */ - public boolean isAuctionWindow() { - return isAuctionWindow; - } - - @Override - public String toString() { - return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}", - start(), end(), auction, isAuctionWindow); - } - - @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - AuctionOrBidWindow that = (AuctionOrBidWindow) o; - return (isAuctionWindow == that.isAuctionWindow) && (auction == that.auction); - } - - @Override public int hashCode() { - return Objects.hash(isAuctionWindow, auction); - } - } - - /** - * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long. - */ - private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> { - private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder(); - private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder(); - private static final Coder<Long> ID_CODER = VarLongCoder.of(); - private static final Coder<Integer> INT_CODER = VarIntCoder.of(); - - @JsonCreator - public static AuctionOrBidWindowCoder of() { - return INSTANCE; - } - - @Override - public void encode(AuctionOrBidWindow window, OutputStream outStream) - throws IOException, CoderException { - SUPER_CODER.encode(window, outStream); - ID_CODER.encode(window.auction, outStream); - INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream); - } - - @Override - public AuctionOrBidWindow decode(InputStream inStream) - throws IOException, CoderException { - IntervalWindow superWindow = SUPER_CODER.decode(inStream); - long auction = ID_CODER.decode(inStream); - boolean isAuctionWindow = INT_CODER.decode(inStream) != 0; - return new AuctionOrBidWindow( - superWindow.start(), superWindow.end(), auction, isAuctionWindow); - } - - @Override public void verifyDeterministic() throws NonDeterministicException {} - } - - /** Assign events to auction windows and merges them intelligently. */ - private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> { - /** Expected duration of auctions in ms. */ - private final long expectedAuctionDurationMs; - - public AuctionOrBidWindowFn(long expectedAuctionDurationMs) { - this.expectedAuctionDurationMs = expectedAuctionDurationMs; - } - - @Override - public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) { - Event event = c.element(); - if (event.newAuction != null) { - // Assign auctions to an auction window which expires at the auction's close. - return Collections - .singletonList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction)); - } else if (event.bid != null) { - // Assign bids to a temporary bid window which will later be merged into the appropriate - // auction window. - return Collections.singletonList( - AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid)); - } else { - // Don't assign people to any window. They will thus be dropped. - return Collections.emptyList(); - } - } - - @Override - public void mergeWindows(MergeContext c) throws Exception { - // Split and index the auction and bid windows by auction id. - Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>(); - Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>(); - for (AuctionOrBidWindow window : c.windows()) { - if (window.isAuctionWindow()) { - idToTrueAuctionWindow.put(window.auction, window); - } else { - List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction); - if (bidWindows == null) { - bidWindows = new ArrayList<>(); - idToBidAuctionWindows.put(window.auction, bidWindows); - } - bidWindows.add(window); - } - } - - // Merge all 'bid' windows into their corresponding 'auction' window, provided the - // auction has not expired. - for (Map.Entry<Long, AuctionOrBidWindow> entry : idToTrueAuctionWindow.entrySet()) { - long auction = entry.getKey(); - AuctionOrBidWindow auctionWindow = entry.getValue(); - List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction); - if (bidWindows != null) { - List<AuctionOrBidWindow> toBeMerged = new ArrayList<>(); - for (AuctionOrBidWindow bidWindow : bidWindows) { - if (bidWindow.start().isBefore(auctionWindow.end())) { - toBeMerged.add(bidWindow); - } - // else: This bid window will remain until its expire time, at which point it - // will expire without ever contributing to an output. - } - if (!toBeMerged.isEmpty()) { - toBeMerged.add(auctionWindow); - c.merge(toBeMerged, auctionWindow); - } - } - } - } - - @Override - public boolean isCompatible(WindowFn<?, ?> other) { - return other instanceof AuctionOrBidWindowFn; - } - - @Override - public Coder<AuctionOrBidWindow> windowCoder() { - return AuctionOrBidWindowCoder.of(); - } - - @Override - public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() { - throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs"); - } - - /** - * Below we will GBK auctions and bids on their auction ids. Then we will reduce those - * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at - * least one valid bid. We would like those output pairs to have a timestamp of the auction's - * expiry (since that's the earliest we know for sure we have the correct winner). We would - * also like to make that winning results are available to following stages at the auction's - * expiry. - * - * <p>Each result of the GBK will have a timestamp of the min of the result of this object's - * assignOutputTime over all records which end up in one of its iterables. Thus we get the - * desired behavior if we ignore each record's timestamp and always return the auction window's - * 'maxTimestamp', which will correspond to the auction's expiry. - * - * <p>In contrast, if this object's assignOutputTime were to return 'inputTimestamp' - * (the usual implementation), then each GBK record will take as its timestamp the minimum of - * the timestamps of all bids and auctions within it, which will always be the auction's - * timestamp. An auction which expires well into the future would thus hold up the watermark - * of the GBK results until that auction expired. That in turn would hold up all winning pairs. - */ - @Override - public Instant getOutputTime( - Instant inputTimestamp, AuctionOrBidWindow window) { - return window.maxTimestamp(); - } - } - - private final AuctionOrBidWindowFn auctionOrBidWindowFn; - - public WinningBids(String name, NexmarkConfiguration configuration) { - super(name); - // What's the expected auction time (when the system is running at the lowest event rate). - long[] interEventDelayUs = configuration.rateShape.interEventDelayUs( - configuration.firstEventRate, configuration.nextEventRate, - configuration.rateUnit, configuration.numEventGenerators); - long longestDelayUs = 0; - for (long interEventDelayU : interEventDelayUs) { - longestDelayUs = Math.max(longestDelayUs, interEventDelayU); - } - // Adjust for proportion of auction events amongst all events. - longestDelayUs = - (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR) - / GeneratorConfig.AUCTION_PROPORTION; - // Adjust for number of in-flight auctions. - longestDelayUs = longestDelayUs * configuration.numInFlightAuctions; - long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000; - NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs); - auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs); - } - - @Override - public PCollection<AuctionBid> expand(PCollection<Event> events) { - // Window auctions and bids into custom auction windows. New people events will be discarded. - // This will allow us to bring bids and auctions together irrespective of how long - // each auction is open for. - events = events.apply("Window", Window.into(auctionOrBidWindowFn)); - - // Key auctions by their id. - PCollection<KV<Long, Auction>> auctionsById = - events.apply(NexmarkQuery.JUST_NEW_AUCTIONS) - .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID); - - // Key bids by their auction id. - PCollection<KV<Long, Bid>> bidsByAuctionId = - events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION); - - // Find the highest price valid bid for each closed auction. - return - // Join auctions and bids. - KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById) - .and(NexmarkQuery.BID_TAG, bidsByAuctionId) - .apply(CoGroupByKey.<Long>create()) - // Filter and select. - .apply(name + ".Join", - ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() { - private final Counter noAuctionCounter = Metrics.counter(name, "noAuction"); - private final Counter underReserveCounter = Metrics.counter(name, "underReserve"); - private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids"); - - @ProcessElement - public void processElement(ProcessContext c) { - Auction auction = - c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null); - if (auction == null) { - // We have bids without a matching auction. Give up. - noAuctionCounter.inc(); - return; - } - // Find the current winning bid for auction. - // The earliest bid with the maximum price above the reserve wins. - Bid bestBid = null; - for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) { - // Bids too late for their auction will have been - // filtered out by the window merge function. - checkState(bid.dateTime < auction.expires); - if (bid.price < auction.reserve) { - // Bid price is below auction reserve. - underReserveCounter.inc(); - continue; - } - - if (bestBid == null - || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) { - bestBid = bid; - } - } - if (bestBid == null) { - // We don't have any valid bids for auction. - noValidBidsCounter.inc(); - return; - } - c.output(new AuctionBid(auction, bestBid)); - } - } - )); - } - - @Override - public int hashCode() { - return Objects.hash(auctionOrBidWindowFn); - } - - @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - WinningBids that = (WinningBids) o; - return auctionOrBidWindowFn.equals(that.auctionOrBidWindowFn); - } -}
