[Nexmark] Extract AuctionGenerator, PriceGenerator from Generator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e895fc82 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e895fc82 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e895fc82 Branch: refs/heads/master Commit: e895fc8294c581d4004006cd898a05300ac7be12 Parents: 7055e0f Author: Anton Kedin <ke...@google.com> Authored: Mon Nov 6 15:12:53 2017 -0800 Committer: Anton Kedin <ke...@google.com> Committed: Wed Nov 15 13:48:37 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/nexmark/sources/Generator.java | 111 +------------- .../nexmark/sources/utils/AuctionGenerator.java | 145 +++++++++++++++++++ .../nexmark/sources/utils/PriceGenerator.java | 32 ++++ 3 files changed, 184 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e895fc82/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java index 69d4579..68e6748 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java @@ -18,19 +18,20 @@ package org.apache.beam.sdk.nexmark.sources; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong; +import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.lastBase0AuctionId; +import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextAuction; +import static org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextBase0AuctionId; import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId; import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId; import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextPerson; +import static org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice; import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra; -import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString; import java.io.Serializable; import java.util.Iterator; import java.util.Objects; import java.util.Random; -import org.apache.beam.sdk.nexmark.model.Auction; import org.apache.beam.sdk.nexmark.model.Bid; import org.apache.beam.sdk.nexmark.model.Event; import org.apache.beam.sdk.values.TimestampedValue; @@ -50,23 +51,12 @@ import org.joda.time.Instant; * so that we can resume generating events from a saved snapshot. */ public class Generator implements Iterator<TimestampedValue<Event>>, Serializable { - /** - * Keep the number of categories small so the example queries will find results even with - * a small batch of events. - */ - private static final int NUM_CATEGORIES = 5; - - /** - * Number of yet-to-be-created people and auction ids allowed. - */ - private static final int AUCTION_ID_LEAD = 10; /** * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1 * over these values. */ private static final int HOT_AUCTION_RATIO = 100; - private static final int HOT_SELLER_RATIO = 100; private static final int HOT_BIDDER_RATIO = 100; /** @@ -206,94 +196,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl } - /** - * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if - * due to generate an auction. - */ - private long lastBase0AuctionId(long eventId) { - long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; - long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; - if (offset < GeneratorConfig.PERSON_PROPORTION) { - // About to generate a person. - // Go back to the last auction in the last epoch. - epoch--; - offset = GeneratorConfig.AUCTION_PROPORTION - 1; - } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { - // About to generate a bid. - // Go back to the last auction generated in this epoch. - offset = GeneratorConfig.AUCTION_PROPORTION - 1; - } else { - // About to generate an auction. - offset -= GeneratorConfig.PERSON_PROPORTION; - } - return epoch * GeneratorConfig.AUCTION_PROPORTION + offset; - } - /** Return a random price. */ - private static long nextPrice(Random random) { - return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0); - } - - /** Return a random time delay, in milliseconds, for length of auctions. */ - private long nextAuctionLengthMs(Random random, long timestamp) { - // What's our current event number? - long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar); - // How many events till we've generated numInFlightAuctions? - long numEventsForAuctions = - (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR) - / GeneratorConfig.AUCTION_PROPORTION; - // When will the auction numInFlightAuctions beyond now be generated? - long futureAuction = - config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions) - .getKey(); - // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n", - // futureAuction - timestamp, numEventsForAuctions); - // Choose a length with average horizonMs. - long horizonMs = futureAuction - timestamp; - return 1L + nextLong(random, Math.max(horizonMs * 2, 1L)); - } - - - /** - * Return a random auction id (base 0). - */ - private long nextBase0AuctionId(long nextEventId, Random random) { - // Choose a random auction for any of those which are likely to still be in flight, - // plus a few 'leads'. - // Note that ideally we'd track non-expired auctions exactly, but that state - // is difficult to split. - long minAuction = Math.max( - lastBase0AuctionId(nextEventId) - config.configuration.numInFlightAuctions, 0); - long maxAuction = lastBase0AuctionId(nextEventId); - return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD); - } - - /** - * Generate and return a random auction with next available id. - */ - private Auction nextAuction(long eventId, Random random, long timestamp) { - long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID; - - long seller; - // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio. - if (random.nextInt(config.configuration.hotSellersRatio) > 0) { - // Choose the first person in the batch of last HOT_SELLER_RATIO people. - seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO; - } else { - seller = nextBase0PersonId(eventId, random, config); - } - seller += GeneratorConfig.FIRST_PERSON_ID; - - long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES); - long initialBid = nextPrice(random); - long expires = timestamp + nextAuctionLengthMs(random, timestamp); - String name = nextString(random, 20); - String desc = nextString(random, 100); - long reserve = initialBid + nextPrice(random); - int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8; - String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize); - return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category, - extra); - } /** * Generate and return a random bid with next available id. @@ -305,7 +207,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions. auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO; } else { - auction = nextBase0AuctionId(eventId, random); + auction = nextBase0AuctionId(eventId, random, config); } auction += GeneratorConfig.FIRST_AUCTION_ID; @@ -370,7 +272,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl if (rem < GeneratorConfig.PERSON_PROPORTION) { event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, config)); } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { - event = new Event(nextAuction(newEventId, random, adjustedEventTimestamp)); + event = new Event( + nextAuction(eventsCountSoFar, newEventId, random, adjustedEventTimestamp, config)); } else { event = new Event(nextBid(newEventId, random, adjustedEventTimestamp)); } http://git-wip-us.apache.org/repos/asf/beam/blob/e895fc82/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java new file mode 100644 index 0000000..90918d6 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.sources.utils; + +import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong; +import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId; +import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId; +import static org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice; +import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra; +import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString; + +import java.util.Random; + +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.sources.GeneratorConfig; + +/** + * AuctionGenerator. + */ +public class AuctionGenerator { + /** + * Keep the number of categories small so the example queries will find results even with + * a small batch of events. + */ + private static final int NUM_CATEGORIES = 5; + + /** + * Number of yet-to-be-created people and auction ids allowed. + */ + private static final int AUCTION_ID_LEAD = 10; + + /** + * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1 + * over these values. + */ + private static final int HOT_SELLER_RATIO = 100; + + /** + * Generate and return a random auction with next available id. + */ + public static Auction nextAuction( + long eventsCountSoFar, long eventId, Random random, long timestamp, GeneratorConfig config) { + + long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID; + + long seller; + // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio. + if (random.nextInt(config.getHotSellersRatio()) > 0) { + // Choose the first person in the batch of last HOT_SELLER_RATIO people. + seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO; + } else { + seller = nextBase0PersonId(eventId, random, config); + } + seller += GeneratorConfig.FIRST_PERSON_ID; + + long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES); + long initialBid = nextPrice(random); + long expires = timestamp + nextAuctionLengthMs(eventsCountSoFar, random, timestamp, config); + String name = nextString(random, 20); + String desc = nextString(random, 100); + long reserve = initialBid + nextPrice(random); + int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8; + String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize); + return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category, + extra); + } + + /** + * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if + * due to generate an auction. + */ + public static long lastBase0AuctionId(long eventId) { + long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; + long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; + if (offset < GeneratorConfig.PERSON_PROPORTION) { + // About to generate a person. + // Go back to the last auction in the last epoch. + epoch--; + offset = GeneratorConfig.AUCTION_PROPORTION - 1; + } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { + // About to generate a bid. + // Go back to the last auction generated in this epoch. + offset = GeneratorConfig.AUCTION_PROPORTION - 1; + } else { + // About to generate an auction. + offset -= GeneratorConfig.PERSON_PROPORTION; + } + return epoch * GeneratorConfig.AUCTION_PROPORTION + offset; + } + + /** + * Return a random auction id (base 0). + */ + public static long nextBase0AuctionId( + long nextEventId, Random random, GeneratorConfig config) { + + // Choose a random auction for any of those which are likely to still be in flight, + // plus a few 'leads'. + // Note that ideally we'd track non-expired auctions exactly, but that state + // is difficult to split. + long minAuction = Math.max( + lastBase0AuctionId(nextEventId) - config.getNumInFlightAuctions(), 0); + long maxAuction = lastBase0AuctionId(nextEventId); + return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD); + } + + /** Return a random time delay, in milliseconds, for length of auctions. */ + private static long nextAuctionLengthMs( + long eventsCountSoFar, Random random, long timestamp, GeneratorConfig config) { + + // What's our current event number? + long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar); + // How many events till we've generated numInFlightAuctions? + long numEventsForAuctions = + (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR) + / GeneratorConfig.AUCTION_PROPORTION; + // When will the auction numInFlightAuctions beyond now be generated? + long futureAuction = + config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions) + .getKey(); + // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n", + // futureAuction - timestamp, numEventsForAuctions); + // Choose a length with average horizonMs. + long horizonMs = futureAuction - timestamp; + return 1L + nextLong(random, Math.max(horizonMs * 2, 1L)); + } + + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e895fc82/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java new file mode 100644 index 0000000..9dae1ca --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.sources.utils; + +import java.util.Random; + +/** + * Generates a random price. + */ +public class PriceGenerator { + + /** Return a random price. */ + public static long nextPrice(Random random) { + return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0); + } +}