[Nexmark] Extract PersonGenerator, StringsGenerator, LongGenerator from Generator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7055e0f3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7055e0f3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7055e0f3 Branch: refs/heads/master Commit: 7055e0f349198575caddaf4106cff24b05fae8f4 Parents: 4fce640 Author: Anton Kedin <[email protected]> Authored: Mon Nov 6 15:01:58 2017 -0800 Committer: Anton Kedin <[email protected]> Committed: Wed Nov 15 13:48:37 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/nexmark/sources/Generator.java | 163 +------------------ .../nexmark/sources/utils/LongGenerator.java | 37 +++++ .../nexmark/sources/utils/PersonGenerator.java | 140 ++++++++++++++++ .../nexmark/sources/utils/StringsGenerator.java | 68 ++++++++ .../sdk/nexmark/sources/utils/package-info.java | 22 +++ 5 files changed, 276 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7055e0f3/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java index 630d0b5..69d4579 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java @@ -18,18 +18,21 @@ package org.apache.beam.sdk.nexmark.sources; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong; +import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId; +import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId; +import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextPerson; +import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra; +import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString; import java.io.Serializable; -import java.util.Arrays; import java.util.Iterator; -import java.util.List; import java.util.Objects; import java.util.Random; import org.apache.beam.sdk.nexmark.model.Auction; import org.apache.beam.sdk.nexmark.model.Bid; import org.apache.beam.sdk.nexmark.model.Event; -import org.apache.beam.sdk.nexmark.model.Person; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Instant; @@ -53,30 +56,9 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl */ private static final int NUM_CATEGORIES = 5; - /** Smallest random string size. */ - private static final int MIN_STRING_LENGTH = 3; - - /** - * Keep the number of states small so that the example queries will find results even with - * a small batch of events. - */ - private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(",")); - - private static final List<String> US_CITIES = - Arrays.asList( - ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne") - .split(",")); - - private static final List<String> FIRST_NAMES = - Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(",")); - - private static final List<String> LAST_NAMES = - Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(",")); - /** * Number of yet-to-be-created people and auction ids allowed. */ - private static final int PERSON_ID_LEAD = 10; private static final int AUCTION_ID_LEAD = 10; /** @@ -223,21 +205,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar); } - /** - * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if - * due to generate a person. - */ - private long lastBase0PersonId(long eventId) { - long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; - long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; - if (offset >= GeneratorConfig.PERSON_PROPORTION) { - // About to generate an auction or bid. - // Go back to the last person generated in this epoch. - offset = GeneratorConfig.PERSON_PROPORTION - 1; - } - // About to generate a person. - return epoch * GeneratorConfig.PERSON_PROPORTION + offset; - } /** * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if @@ -261,63 +228,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl } return epoch * GeneratorConfig.AUCTION_PROPORTION + offset; } - - /** return a random US state. */ - private static String nextUSState(Random random) { - return US_STATES.get(random.nextInt(US_STATES.size())); - } - - /** Return a random US city. */ - private static String nextUSCity(Random random) { - return US_CITIES.get(random.nextInt(US_CITIES.size())); - } - - /** Return a random person name. */ - private static String nextPersonName(Random random) { - return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " " - + LAST_NAMES.get(random.nextInt(LAST_NAMES.size())); - } - - /** Return a random string of up to {@code maxLength}. */ - private static String nextString(Random random, int maxLength) { - int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH); - StringBuilder sb = new StringBuilder(); - while (len-- > 0) { - if (random.nextInt(13) == 0) { - sb.append(' '); - } else { - sb.append((char) ('a' + random.nextInt(26))); - } - } - return sb.toString().trim(); - } - - /** Return a random string of exactly {@code length}. */ - private static String nextExactString(Random random, int length) { - StringBuilder sb = new StringBuilder(); - while (length-- > 0) { - sb.append((char) ('a' + random.nextInt(26))); - } - return sb.toString(); - } - - /** Return a random email address. */ - private static String nextEmail(Random random) { - return nextString(random, 7) + "@" + nextString(random, 5) + ".com"; - } - - /** Return a random credit card number. */ - private static String nextCreditCard(Random random) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < 4; i++) { - if (i > 0) { - sb.append(' '); - } - sb.append(String.format("%04d", random.nextInt(10000))); - } - return sb.toString(); - } - /** Return a random price. */ private static long nextPrice(Random random) { return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0); @@ -342,61 +252,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl return 1L + nextLong(random, Math.max(horizonMs * 2, 1L)); } - /** - * Return a random {@code string} such that {@code currentSize + string.length()} is on average - * {@code averageSize}. - */ - private static String nextExtra(Random random, int currentSize, int desiredAverageSize) { - if (currentSize > desiredAverageSize) { - return ""; - } - desiredAverageSize -= currentSize; - int delta = (int) Math.round(desiredAverageSize * 0.2); - int minSize = desiredAverageSize - delta; - int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta)); - return nextExactString(random, desiredSize); - } - - /** Return a random long from {@code [0, n)}. */ - private static long nextLong(Random random, long n) { - if (n < Integer.MAX_VALUE) { - return random.nextInt((int) n); - } else { - // WARNING: Very skewed distribution! Bad! - return Math.abs(random.nextLong() % n); - } - } - - /** - * Generate and return a random person with next available id. - */ - private Person nextPerson(long nextEventId, Random random, long timestamp) { - long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID; - String name = nextPersonName(random); - String email = nextEmail(random); - String creditCard = nextCreditCard(random); - String city = nextUSCity(random); - String state = nextUSState(random); - int currentSize = - 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length(); - String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize); - return new Person(id, name, email, creditCard, city, state, timestamp, extra); - } - - /** - * Return a random person id (base 0). - */ - private long nextBase0PersonId(long eventId, Random random) { - // Choose a random person from any of the 'active' people, plus a few 'leads'. - // By limiting to 'active' we ensure the density of bids or auctions per person - // does not decrease over time for long running jobs. - // By choosing a person id ahead of the last valid person id we will make - // newPerson and newAuction events appear to have been swapped in time. - long numPeople = lastBase0PersonId(eventId) + 1; - long activePeople = Math.min(numPeople, config.configuration.numActivePeople); - long n = nextLong(random, activePeople + PERSON_ID_LEAD); - return numPeople - activePeople + n; - } /** * Return a random auction id (base 0). @@ -424,7 +279,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl // Choose the first person in the batch of last HOT_SELLER_RATIO people. seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO; } else { - seller = nextBase0PersonId(eventId, random); + seller = nextBase0PersonId(eventId, random, config); } seller += GeneratorConfig.FIRST_PERSON_ID; @@ -461,7 +316,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl // last HOT_BIDDER_RATIO people. bidder = (lastBase0PersonId(getNextEventId()) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1; } else { - bidder = nextBase0PersonId(eventId, random); + bidder = nextBase0PersonId(eventId, random, config); } bidder += GeneratorConfig.FIRST_PERSON_ID; @@ -513,7 +368,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl Event event; if (rem < GeneratorConfig.PERSON_PROPORTION) { - event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp)); + event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, config)); } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { event = new Event(nextAuction(newEventId, random, adjustedEventTimestamp)); } else { http://git-wip-us.apache.org/repos/asf/beam/blob/7055e0f3/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java new file mode 100644 index 0000000..8eccb66 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.sources.utils; + +import java.util.Random; + +/** + * LongGenerator. + */ +public class LongGenerator { + + /** Return a random long from {@code [0, n)}. */ + public static long nextLong(Random random, long n) { + if (n < Integer.MAX_VALUE) { + return random.nextInt((int) n); + } else { + // WARNING: Very skewed distribution! Bad! + return Math.abs(random.nextLong() % n); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7055e0f3/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java new file mode 100644 index 0000000..a02fff9 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.sources.utils; + +import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong; +import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra; +import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.nexmark.sources.GeneratorConfig; + +/** + * Generates people. + */ +public class PersonGenerator { + /** + * Number of yet-to-be-created people and auction ids allowed. + */ + private static final int PERSON_ID_LEAD = 10; + + /** + * Keep the number of states small so that the example queries will find results even with + * a small batch of events. + */ + private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(",")); + + private static final List<String> US_CITIES = + Arrays.asList( + ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne") + .split(",")); + + private static final List<String> FIRST_NAMES = + Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(",")); + + private static final List<String> LAST_NAMES = + Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(",")); + + + /** + * Generate and return a random person with next available id. + */ + public static Person nextPerson( + long nextEventId, Random random, long timestamp, GeneratorConfig config) { + + long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID; + String name = nextPersonName(random); + String email = nextEmail(random); + String creditCard = nextCreditCard(random); + String city = nextUSCity(random); + String state = nextUSState(random); + int currentSize = + 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length(); + String extra = nextExtra(random, currentSize, config.getAvgPersonByteSize()); + return new Person(id, name, email, creditCard, city, state, timestamp, extra); + } + + /** + * Return a random person id (base 0). + */ + public static long nextBase0PersonId(long eventId, Random random, GeneratorConfig config) { + // Choose a random person from any of the 'active' people, plus a few 'leads'. + // By limiting to 'active' we ensure the density of bids or auctions per person + // does not decrease over time for long running jobs. + // By choosing a person id ahead of the last valid person id we will make + // newPerson and newAuction events appear to have been swapped in time. + long numPeople = lastBase0PersonId(eventId) + 1; + long activePeople = Math.min(numPeople, config.getNumActivePeople()); + long n = nextLong(random, activePeople + PERSON_ID_LEAD); + return numPeople - activePeople + n; + } + + /** + * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if + * due to generate a person. + */ + public static long lastBase0PersonId(long eventId) { + long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; + long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; + if (offset >= GeneratorConfig.PERSON_PROPORTION) { + // About to generate an auction or bid. + // Go back to the last person generated in this epoch. + offset = GeneratorConfig.PERSON_PROPORTION - 1; + } + // About to generate a person. + return epoch * GeneratorConfig.PERSON_PROPORTION + offset; + } + + + /** return a random US state. */ + private static String nextUSState(Random random) { + return US_STATES.get(random.nextInt(US_STATES.size())); + } + + /** Return a random US city. */ + private static String nextUSCity(Random random) { + return US_CITIES.get(random.nextInt(US_CITIES.size())); + } + + /** Return a random person name. */ + private static String nextPersonName(Random random) { + return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " " + + LAST_NAMES.get(random.nextInt(LAST_NAMES.size())); + } + + /** Return a random email address. */ + private static String nextEmail(Random random) { + return nextString(random, 7) + "@" + nextString(random, 5) + ".com"; + } + + /** Return a random credit card number. */ + private static String nextCreditCard(Random random) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 4; i++) { + if (i > 0) { + sb.append(' '); + } + sb.append(String.format("%04d", random.nextInt(10000))); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7055e0f3/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java new file mode 100644 index 0000000..4e69a9d --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.sources.utils; + +import java.util.Random; + +/** + * Generates strings which are used for different field in other model objects. + */ +public class StringsGenerator { + + /** Smallest random string size. */ + private static final int MIN_STRING_LENGTH = 3; + + /** Return a random string of up to {@code maxLength}. */ + public static String nextString(Random random, int maxLength) { + int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH); + StringBuilder sb = new StringBuilder(); + while (len-- > 0) { + if (random.nextInt(13) == 0) { + sb.append(' '); + } else { + sb.append((char) ('a' + random.nextInt(26))); + } + } + return sb.toString().trim(); + } + + /** Return a random string of exactly {@code length}. */ + public static String nextExactString(Random random, int length) { + StringBuilder sb = new StringBuilder(); + while (length-- > 0) { + sb.append((char) ('a' + random.nextInt(26))); + } + return sb.toString(); + } + + /** + * Return a random {@code string} such that {@code currentSize + string.length()} is on average + * {@code averageSize}. + */ + public static String nextExtra(Random random, int currentSize, int desiredAverageSize) { + if (currentSize > desiredAverageSize) { + return ""; + } + desiredAverageSize -= currentSize; + int delta = (int) Math.round(desiredAverageSize * 0.2); + int minSize = desiredAverageSize - delta; + int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta)); + return nextExactString(random, desiredSize); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7055e0f3/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java new file mode 100644 index 0000000..e09564a --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utility classes for Generator. + */ +package org.apache.beam.sdk.nexmark.sources.utils;
