Repository: beam Updated Branches: refs/heads/master f10399d7c -> a52dbeaca
http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java deleted file mode 100644 index 90918d6..0000000 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.nexmark.sources.utils; - -import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong; -import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId; -import static org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId; -import static org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice; -import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra; -import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString; - -import java.util.Random; - -import org.apache.beam.sdk.nexmark.model.Auction; -import org.apache.beam.sdk.nexmark.sources.GeneratorConfig; - -/** - * AuctionGenerator. - */ -public class AuctionGenerator { - /** - * Keep the number of categories small so the example queries will find results even with - * a small batch of events. - */ - private static final int NUM_CATEGORIES = 5; - - /** - * Number of yet-to-be-created people and auction ids allowed. - */ - private static final int AUCTION_ID_LEAD = 10; - - /** - * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1 - * over these values. - */ - private static final int HOT_SELLER_RATIO = 100; - - /** - * Generate and return a random auction with next available id. - */ - public static Auction nextAuction( - long eventsCountSoFar, long eventId, Random random, long timestamp, GeneratorConfig config) { - - long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID; - - long seller; - // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio. - if (random.nextInt(config.getHotSellersRatio()) > 0) { - // Choose the first person in the batch of last HOT_SELLER_RATIO people. - seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO; - } else { - seller = nextBase0PersonId(eventId, random, config); - } - seller += GeneratorConfig.FIRST_PERSON_ID; - - long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES); - long initialBid = nextPrice(random); - long expires = timestamp + nextAuctionLengthMs(eventsCountSoFar, random, timestamp, config); - String name = nextString(random, 20); - String desc = nextString(random, 100); - long reserve = initialBid + nextPrice(random); - int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8; - String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize); - return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category, - extra); - } - - /** - * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if - * due to generate an auction. - */ - public static long lastBase0AuctionId(long eventId) { - long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; - long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; - if (offset < GeneratorConfig.PERSON_PROPORTION) { - // About to generate a person. - // Go back to the last auction in the last epoch. - epoch--; - offset = GeneratorConfig.AUCTION_PROPORTION - 1; - } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { - // About to generate a bid. - // Go back to the last auction generated in this epoch. - offset = GeneratorConfig.AUCTION_PROPORTION - 1; - } else { - // About to generate an auction. - offset -= GeneratorConfig.PERSON_PROPORTION; - } - return epoch * GeneratorConfig.AUCTION_PROPORTION + offset; - } - - /** - * Return a random auction id (base 0). - */ - public static long nextBase0AuctionId( - long nextEventId, Random random, GeneratorConfig config) { - - // Choose a random auction for any of those which are likely to still be in flight, - // plus a few 'leads'. - // Note that ideally we'd track non-expired auctions exactly, but that state - // is difficult to split. - long minAuction = Math.max( - lastBase0AuctionId(nextEventId) - config.getNumInFlightAuctions(), 0); - long maxAuction = lastBase0AuctionId(nextEventId); - return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD); - } - - /** Return a random time delay, in milliseconds, for length of auctions. */ - private static long nextAuctionLengthMs( - long eventsCountSoFar, Random random, long timestamp, GeneratorConfig config) { - - // What's our current event number? - long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar); - // How many events till we've generated numInFlightAuctions? - long numEventsForAuctions = - (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR) - / GeneratorConfig.AUCTION_PROPORTION; - // When will the auction numInFlightAuctions beyond now be generated? - long futureAuction = - config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions) - .getKey(); - // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n", - // futureAuction - timestamp, numEventsForAuctions); - // Choose a length with average horizonMs. - long horizonMs = futureAuction - timestamp; - return 1L + nextLong(random, Math.max(horizonMs * 2, 1L)); - } - - -} http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java deleted file mode 100644 index 8eccb66..0000000 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.nexmark.sources.utils; - -import java.util.Random; - -/** - * LongGenerator. - */ -public class LongGenerator { - - /** Return a random long from {@code [0, n)}. */ - public static long nextLong(Random random, long n) { - if (n < Integer.MAX_VALUE) { - return random.nextInt((int) n); - } else { - // WARNING: Very skewed distribution! Bad! - return Math.abs(random.nextLong() % n); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java deleted file mode 100644 index a02fff9..0000000 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.nexmark.sources.utils; - -import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong; -import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra; -import static org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString; - -import java.util.Arrays; -import java.util.List; -import java.util.Random; - -import org.apache.beam.sdk.nexmark.model.Person; -import org.apache.beam.sdk.nexmark.sources.GeneratorConfig; - -/** - * Generates people. - */ -public class PersonGenerator { - /** - * Number of yet-to-be-created people and auction ids allowed. - */ - private static final int PERSON_ID_LEAD = 10; - - /** - * Keep the number of states small so that the example queries will find results even with - * a small batch of events. - */ - private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(",")); - - private static final List<String> US_CITIES = - Arrays.asList( - ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne") - .split(",")); - - private static final List<String> FIRST_NAMES = - Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(",")); - - private static final List<String> LAST_NAMES = - Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(",")); - - - /** - * Generate and return a random person with next available id. - */ - public static Person nextPerson( - long nextEventId, Random random, long timestamp, GeneratorConfig config) { - - long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID; - String name = nextPersonName(random); - String email = nextEmail(random); - String creditCard = nextCreditCard(random); - String city = nextUSCity(random); - String state = nextUSState(random); - int currentSize = - 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length(); - String extra = nextExtra(random, currentSize, config.getAvgPersonByteSize()); - return new Person(id, name, email, creditCard, city, state, timestamp, extra); - } - - /** - * Return a random person id (base 0). - */ - public static long nextBase0PersonId(long eventId, Random random, GeneratorConfig config) { - // Choose a random person from any of the 'active' people, plus a few 'leads'. - // By limiting to 'active' we ensure the density of bids or auctions per person - // does not decrease over time for long running jobs. - // By choosing a person id ahead of the last valid person id we will make - // newPerson and newAuction events appear to have been swapped in time. - long numPeople = lastBase0PersonId(eventId) + 1; - long activePeople = Math.min(numPeople, config.getNumActivePeople()); - long n = nextLong(random, activePeople + PERSON_ID_LEAD); - return numPeople - activePeople + n; - } - - /** - * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if - * due to generate a person. - */ - public static long lastBase0PersonId(long eventId) { - long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; - long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; - if (offset >= GeneratorConfig.PERSON_PROPORTION) { - // About to generate an auction or bid. - // Go back to the last person generated in this epoch. - offset = GeneratorConfig.PERSON_PROPORTION - 1; - } - // About to generate a person. - return epoch * GeneratorConfig.PERSON_PROPORTION + offset; - } - - - /** return a random US state. */ - private static String nextUSState(Random random) { - return US_STATES.get(random.nextInt(US_STATES.size())); - } - - /** Return a random US city. */ - private static String nextUSCity(Random random) { - return US_CITIES.get(random.nextInt(US_CITIES.size())); - } - - /** Return a random person name. */ - private static String nextPersonName(Random random) { - return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " " - + LAST_NAMES.get(random.nextInt(LAST_NAMES.size())); - } - - /** Return a random email address. */ - private static String nextEmail(Random random) { - return nextString(random, 7) + "@" + nextString(random, 5) + ".com"; - } - - /** Return a random credit card number. */ - private static String nextCreditCard(Random random) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < 4; i++) { - if (i > 0) { - sb.append(' '); - } - sb.append(String.format("%04d", random.nextInt(10000))); - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java deleted file mode 100644 index 9dae1ca..0000000 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.nexmark.sources.utils; - -import java.util.Random; - -/** - * Generates a random price. - */ -public class PriceGenerator { - - /** Return a random price. */ - public static long nextPrice(Random random) { - return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java deleted file mode 100644 index 4e69a9d..0000000 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.nexmark.sources.utils; - -import java.util.Random; - -/** - * Generates strings which are used for different field in other model objects. - */ -public class StringsGenerator { - - /** Smallest random string size. */ - private static final int MIN_STRING_LENGTH = 3; - - /** Return a random string of up to {@code maxLength}. */ - public static String nextString(Random random, int maxLength) { - int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH); - StringBuilder sb = new StringBuilder(); - while (len-- > 0) { - if (random.nextInt(13) == 0) { - sb.append(' '); - } else { - sb.append((char) ('a' + random.nextInt(26))); - } - } - return sb.toString().trim(); - } - - /** Return a random string of exactly {@code length}. */ - public static String nextExactString(Random random, int length) { - StringBuilder sb = new StringBuilder(); - while (length-- > 0) { - sb.append((char) ('a' + random.nextInt(26))); - } - return sb.toString(); - } - - /** - * Return a random {@code string} such that {@code currentSize + string.length()} is on average - * {@code averageSize}. - */ - public static String nextExtra(Random random, int currentSize, int desiredAverageSize) { - if (currentSize > desiredAverageSize) { - return ""; - } - desiredAverageSize -= currentSize; - int delta = (int) Math.round(desiredAverageSize * 0.2); - int minSize = desiredAverageSize - delta; - int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta)); - return nextExactString(random, desiredSize); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java deleted file mode 100644 index e09564a..0000000 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Utility classes for Generator. - */ -package org.apache.beam.sdk.nexmark.sources.utils; http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java index 3590d64..beef314 100644 --- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.nexmark.sources; import org.apache.beam.sdk.nexmark.NexmarkConfiguration; import org.apache.beam.sdk.nexmark.NexmarkOptions; +import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.SourceTestUtils; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java index 9553d22..fbb8136 100644 --- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.beam.sdk.nexmark.NexmarkConfiguration; +import org.apache.beam.sdk.nexmark.sources.generator.Generator; +import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java index c00d1a3..5c9bf5f 100644 --- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java @@ -30,6 +30,9 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.nexmark.NexmarkConfiguration; import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.sources.generator.Generator; +import org.apache.beam.sdk.nexmark.sources.generator.GeneratorCheckpoint; +import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.junit.Test;
