[Nexmark] Extract AuctionGenerator, PriceGenerator from Generator

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e895fc82
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e895fc82
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e895fc82

Branch: refs/heads/master
Commit: e895fc8294c581d4004006cd898a05300ac7be12
Parents: 7055e0f
Author: Anton Kedin <ke...@google.com>
Authored: Mon Nov 6 15:12:53 2017 -0800
Committer: Anton Kedin <ke...@google.com>
Committed: Wed Nov 15 13:48:37 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/nexmark/sources/Generator.java     | 111 +-------------
 .../nexmark/sources/utils/AuctionGenerator.java | 145 +++++++++++++++++++
 .../nexmark/sources/utils/PriceGenerator.java   |  32 ++++
 3 files changed, 184 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e895fc82/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
index 69d4579..68e6748 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java
@@ -18,19 +18,20 @@
 package org.apache.beam.sdk.nexmark.sources;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong;
+import static 
org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.lastBase0AuctionId;
+import static 
org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextAuction;
+import static 
org.apache.beam.sdk.nexmark.sources.utils.AuctionGenerator.nextBase0AuctionId;
 import static 
org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId;
 import static 
org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId;
 import static 
org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextPerson;
+import static 
org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice;
 import static 
org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
-import static 
org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString;
 
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.Random;
 
-import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -50,23 +51,12 @@ import org.joda.time.Instant;
  * so that we can resume generating events from a saved snapshot.
  */
 public class Generator implements Iterator<TimestampedValue<Event>>, 
Serializable {
-  /**
-   * Keep the number of categories small so the example queries will find 
results even with
-   * a small batch of events.
-   */
-  private static final int NUM_CATEGORIES = 5;
-
-  /**
-   * Number of yet-to-be-created people and auction ids allowed.
-   */
-  private static final int AUCTION_ID_LEAD = 10;
 
   /**
    * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions 
are 1
    * over these values.
    */
   private static final int HOT_AUCTION_RATIO = 100;
-  private static final int HOT_SELLER_RATIO = 100;
   private static final int HOT_BIDDER_RATIO = 100;
 
   /**
@@ -206,94 +196,6 @@ public class Generator implements 
Iterator<TimestampedValue<Event>>, Serializabl
   }
 
 
-  /**
-   * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the 
current auction id if
-   * due to generate an auction.
-   */
-  private long lastBase0AuctionId(long eventId) {
-    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
-    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
-    if (offset < GeneratorConfig.PERSON_PROPORTION) {
-      // About to generate a person.
-      // Go back to the last auction in the last epoch.
-      epoch--;
-      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
-    } else if (offset >= GeneratorConfig.PERSON_PROPORTION + 
GeneratorConfig.AUCTION_PROPORTION) {
-      // About to generate a bid.
-      // Go back to the last auction generated in this epoch.
-      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
-    } else {
-      // About to generate an auction.
-      offset -= GeneratorConfig.PERSON_PROPORTION;
-    }
-    return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
-  }
-  /** Return a random price. */
-  private static long nextPrice(Random random) {
-    return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
-  }
-
-  /** Return a random time delay, in milliseconds, for length of auctions. */
-  private long nextAuctionLengthMs(Random random, long timestamp) {
-    // What's our current event number?
-    long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
-    // How many events till we've generated numInFlightAuctions?
-    long numEventsForAuctions =
-        (config.configuration.numInFlightAuctions * 
GeneratorConfig.PROPORTION_DENOMINATOR)
-        / GeneratorConfig.AUCTION_PROPORTION;
-    // When will the auction numInFlightAuctions beyond now be generated?
-    long futureAuction =
-        config.timestampAndInterEventDelayUsForEvent(currentEventNumber + 
numEventsForAuctions)
-            .getKey();
-    // System.out.printf("*** auction will be for %dms (%d events ahead) 
***\n",
-    //     futureAuction - timestamp, numEventsForAuctions);
-    // Choose a length with average horizonMs.
-    long horizonMs = futureAuction - timestamp;
-    return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
-  }
-
-
-  /**
-   * Return a random auction id (base 0).
-   */
-  private long nextBase0AuctionId(long nextEventId, Random random) {
-    // Choose a random auction for any of those which are likely to still be 
in flight,
-    // plus a few 'leads'.
-    // Note that ideally we'd track non-expired auctions exactly, but that 
state
-    // is difficult to split.
-    long minAuction = Math.max(
-        lastBase0AuctionId(nextEventId) - 
config.configuration.numInFlightAuctions, 0);
-    long maxAuction = lastBase0AuctionId(nextEventId);
-    return minAuction + nextLong(random, maxAuction - minAuction + 1 + 
AUCTION_ID_LEAD);
-  }
-
-  /**
-   * Generate and return a random auction with next available id.
-   */
-  private Auction nextAuction(long eventId, Random random, long timestamp) {
-    long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
-
-    long seller;
-    // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
-    if (random.nextInt(config.configuration.hotSellersRatio) > 0) {
-      // Choose the first person in the batch of last HOT_SELLER_RATIO people.
-      seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * 
HOT_SELLER_RATIO;
-    } else {
-      seller = nextBase0PersonId(eventId, random, config);
-    }
-    seller += GeneratorConfig.FIRST_PERSON_ID;
-
-    long category = GeneratorConfig.FIRST_CATEGORY_ID + 
random.nextInt(NUM_CATEGORIES);
-    long initialBid = nextPrice(random);
-    long expires = timestamp + nextAuctionLengthMs(random, timestamp);
-    String name = nextString(random, 20);
-    String desc = nextString(random, 100);
-    long reserve = initialBid + nextPrice(random);
-    int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
-    String extra = nextExtra(random, currentSize, 
config.configuration.avgAuctionByteSize);
-    return new Auction(id, name, desc, initialBid, reserve, timestamp, 
expires, seller, category,
-        extra);
-  }
 
   /**
    * Generate and return a random bid with next available id.
@@ -305,7 +207,7 @@ public class Generator implements 
Iterator<TimestampedValue<Event>>, Serializabl
       // Choose the first auction in the batch of last HOT_AUCTION_RATIO 
auctions.
       auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * 
HOT_AUCTION_RATIO;
     } else {
-      auction = nextBase0AuctionId(eventId, random);
+      auction = nextBase0AuctionId(eventId, random, config);
     }
     auction += GeneratorConfig.FIRST_AUCTION_ID;
 
@@ -370,7 +272,8 @@ public class Generator implements 
Iterator<TimestampedValue<Event>>, Serializabl
     if (rem < GeneratorConfig.PERSON_PROPORTION) {
       event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp, 
config));
     } else if (rem < GeneratorConfig.PERSON_PROPORTION + 
GeneratorConfig.AUCTION_PROPORTION) {
-      event = new Event(nextAuction(newEventId, random, 
adjustedEventTimestamp));
+      event = new Event(
+          nextAuction(eventsCountSoFar, newEventId, random, 
adjustedEventTimestamp, config));
     } else {
       event = new Event(nextBid(newEventId, random, adjustedEventTimestamp));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e895fc82/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
new file mode 100644
index 0000000..90918d6
--- /dev/null
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.utils;
+
+import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong;
+import static 
org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId;
+import static 
org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId;
+import static 
org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice;
+import static 
org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
+import static 
org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString;
+
+import java.util.Random;
+
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
+
+/**
+ * AuctionGenerator.
+ */
+public class AuctionGenerator {
+  /**
+   * Keep the number of categories small so the example queries will find 
results even with
+   * a small batch of events.
+   */
+  private static final int NUM_CATEGORIES = 5;
+
+  /**
+   * Number of yet-to-be-created people and auction ids allowed.
+   */
+  private static final int AUCTION_ID_LEAD = 10;
+
+  /**
+   * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions 
are 1
+   * over these values.
+   */
+  private static final int HOT_SELLER_RATIO = 100;
+
+  /**
+   * Generate and return a random auction with next available id.
+   */
+  public static Auction nextAuction(
+      long eventsCountSoFar, long eventId, Random random, long timestamp, 
GeneratorConfig config) {
+
+    long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
+
+    long seller;
+    // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
+    if (random.nextInt(config.getHotSellersRatio()) > 0) {
+      // Choose the first person in the batch of last HOT_SELLER_RATIO people.
+      seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * 
HOT_SELLER_RATIO;
+    } else {
+      seller = nextBase0PersonId(eventId, random, config);
+    }
+    seller += GeneratorConfig.FIRST_PERSON_ID;
+
+    long category = GeneratorConfig.FIRST_CATEGORY_ID + 
random.nextInt(NUM_CATEGORIES);
+    long initialBid = nextPrice(random);
+    long expires = timestamp + nextAuctionLengthMs(eventsCountSoFar, random, 
timestamp, config);
+    String name = nextString(random, 20);
+    String desc = nextString(random, 100);
+    long reserve = initialBid + nextPrice(random);
+    int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
+    String extra = nextExtra(random, currentSize, 
config.configuration.avgAuctionByteSize);
+    return new Auction(id, name, desc, initialBid, reserve, timestamp, 
expires, seller, category,
+        extra);
+  }
+
+  /**
+   * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the 
current auction id if
+   * due to generate an auction.
+   */
+  public static long lastBase0AuctionId(long eventId) {
+    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
+    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
+    if (offset < GeneratorConfig.PERSON_PROPORTION) {
+      // About to generate a person.
+      // Go back to the last auction in the last epoch.
+      epoch--;
+      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+    } else if (offset >= GeneratorConfig.PERSON_PROPORTION + 
GeneratorConfig.AUCTION_PROPORTION) {
+      // About to generate a bid.
+      // Go back to the last auction generated in this epoch.
+      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
+    } else {
+      // About to generate an auction.
+      offset -= GeneratorConfig.PERSON_PROPORTION;
+    }
+    return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
+  }
+
+  /**
+   * Return a random auction id (base 0).
+   */
+  public static long nextBase0AuctionId(
+      long nextEventId, Random random, GeneratorConfig config) {
+
+    // Choose a random auction for any of those which are likely to still be 
in flight,
+    // plus a few 'leads'.
+    // Note that ideally we'd track non-expired auctions exactly, but that 
state
+    // is difficult to split.
+    long minAuction = Math.max(
+        lastBase0AuctionId(nextEventId) - config.getNumInFlightAuctions(), 0);
+    long maxAuction = lastBase0AuctionId(nextEventId);
+    return minAuction + nextLong(random, maxAuction - minAuction + 1 + 
AUCTION_ID_LEAD);
+  }
+
+  /** Return a random time delay, in milliseconds, for length of auctions. */
+  private static long nextAuctionLengthMs(
+      long eventsCountSoFar, Random random, long timestamp, GeneratorConfig 
config) {
+
+    // What's our current event number?
+    long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
+    // How many events till we've generated numInFlightAuctions?
+    long numEventsForAuctions =
+        (config.configuration.numInFlightAuctions * 
GeneratorConfig.PROPORTION_DENOMINATOR)
+            / GeneratorConfig.AUCTION_PROPORTION;
+    // When will the auction numInFlightAuctions beyond now be generated?
+    long futureAuction =
+        config.timestampAndInterEventDelayUsForEvent(currentEventNumber + 
numEventsForAuctions)
+            .getKey();
+    // System.out.printf("*** auction will be for %dms (%d events ahead) 
***\n",
+    //     futureAuction - timestamp, numEventsForAuctions);
+    // Choose a length with average horizonMs.
+    long horizonMs = futureAuction - timestamp;
+    return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e895fc82/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
new file mode 100644
index 0000000..9dae1ca
--- /dev/null
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.sources.utils;
+
+import java.util.Random;
+
+/**
+ * Generates a random price.
+ */
+public class PriceGenerator {
+
+  /** Return a random price. */
+  public static long nextPrice(Random random) {
+    return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
+  }
+}

Reply via email to