Repository: beam
Updated Branches:
  refs/heads/master f10399d7c -> a52dbeaca


http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
deleted file mode 100644
index 90918d6..0000000
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/AuctionGenerator.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources.utils;
-
-import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong;
-import static 
org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.lastBase0PersonId;
-import static 
org.apache.beam.sdk.nexmark.sources.utils.PersonGenerator.nextBase0PersonId;
-import static 
org.apache.beam.sdk.nexmark.sources.utils.PriceGenerator.nextPrice;
-import static 
org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
-import static 
org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString;
-
-import java.util.Random;
-
-import org.apache.beam.sdk.nexmark.model.Auction;
-import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
-
-/**
- * AuctionGenerator.
- */
-public class AuctionGenerator {
-  /**
-   * Keep the number of categories small so the example queries will find 
results even with
-   * a small batch of events.
-   */
-  private static final int NUM_CATEGORIES = 5;
-
-  /**
-   * Number of yet-to-be-created people and auction ids allowed.
-   */
-  private static final int AUCTION_ID_LEAD = 10;
-
-  /**
-   * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions 
are 1
-   * over these values.
-   */
-  private static final int HOT_SELLER_RATIO = 100;
-
-  /**
-   * Generate and return a random auction with next available id.
-   */
-  public static Auction nextAuction(
-      long eventsCountSoFar, long eventId, Random random, long timestamp, 
GeneratorConfig config) {
-
-    long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID;
-
-    long seller;
-    // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio.
-    if (random.nextInt(config.getHotSellersRatio()) > 0) {
-      // Choose the first person in the batch of last HOT_SELLER_RATIO people.
-      seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * 
HOT_SELLER_RATIO;
-    } else {
-      seller = nextBase0PersonId(eventId, random, config);
-    }
-    seller += GeneratorConfig.FIRST_PERSON_ID;
-
-    long category = GeneratorConfig.FIRST_CATEGORY_ID + 
random.nextInt(NUM_CATEGORIES);
-    long initialBid = nextPrice(random);
-    long expires = timestamp + nextAuctionLengthMs(eventsCountSoFar, random, 
timestamp, config);
-    String name = nextString(random, 20);
-    String desc = nextString(random, 100);
-    long reserve = initialBid + nextPrice(random);
-    int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
-    String extra = nextExtra(random, currentSize, 
config.configuration.avgAuctionByteSize);
-    return new Auction(id, name, desc, initialBid, reserve, timestamp, 
expires, seller, category,
-        extra);
-  }
-
-  /**
-   * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the 
current auction id if
-   * due to generate an auction.
-   */
-  public static long lastBase0AuctionId(long eventId) {
-    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
-    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
-    if (offset < GeneratorConfig.PERSON_PROPORTION) {
-      // About to generate a person.
-      // Go back to the last auction in the last epoch.
-      epoch--;
-      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
-    } else if (offset >= GeneratorConfig.PERSON_PROPORTION + 
GeneratorConfig.AUCTION_PROPORTION) {
-      // About to generate a bid.
-      // Go back to the last auction generated in this epoch.
-      offset = GeneratorConfig.AUCTION_PROPORTION - 1;
-    } else {
-      // About to generate an auction.
-      offset -= GeneratorConfig.PERSON_PROPORTION;
-    }
-    return epoch * GeneratorConfig.AUCTION_PROPORTION + offset;
-  }
-
-  /**
-   * Return a random auction id (base 0).
-   */
-  public static long nextBase0AuctionId(
-      long nextEventId, Random random, GeneratorConfig config) {
-
-    // Choose a random auction for any of those which are likely to still be 
in flight,
-    // plus a few 'leads'.
-    // Note that ideally we'd track non-expired auctions exactly, but that 
state
-    // is difficult to split.
-    long minAuction = Math.max(
-        lastBase0AuctionId(nextEventId) - config.getNumInFlightAuctions(), 0);
-    long maxAuction = lastBase0AuctionId(nextEventId);
-    return minAuction + nextLong(random, maxAuction - minAuction + 1 + 
AUCTION_ID_LEAD);
-  }
-
-  /** Return a random time delay, in milliseconds, for length of auctions. */
-  private static long nextAuctionLengthMs(
-      long eventsCountSoFar, Random random, long timestamp, GeneratorConfig 
config) {
-
-    // What's our current event number?
-    long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar);
-    // How many events till we've generated numInFlightAuctions?
-    long numEventsForAuctions =
-        (config.configuration.numInFlightAuctions * 
GeneratorConfig.PROPORTION_DENOMINATOR)
-            / GeneratorConfig.AUCTION_PROPORTION;
-    // When will the auction numInFlightAuctions beyond now be generated?
-    long futureAuction =
-        config.timestampAndInterEventDelayUsForEvent(currentEventNumber + 
numEventsForAuctions)
-            .getKey();
-    // System.out.printf("*** auction will be for %dms (%d events ahead) 
***\n",
-    //     futureAuction - timestamp, numEventsForAuctions);
-    // Choose a length with average horizonMs.
-    long horizonMs = futureAuction - timestamp;
-    return 1L + nextLong(random, Math.max(horizonMs * 2, 1L));
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java
deleted file mode 100644
index 8eccb66..0000000
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/LongGenerator.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources.utils;
-
-import java.util.Random;
-
-/**
- * LongGenerator.
- */
-public class LongGenerator {
-
-  /** Return a random long from {@code [0, n)}. */
-  public static long nextLong(Random random, long n) {
-    if (n < Integer.MAX_VALUE) {
-      return random.nextInt((int) n);
-    } else {
-      // WARNING: Very skewed distribution! Bad!
-      return Math.abs(random.nextLong() % n);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java
deleted file mode 100644
index a02fff9..0000000
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PersonGenerator.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.nexmark.sources.utils;
-
-import static org.apache.beam.sdk.nexmark.sources.utils.LongGenerator.nextLong;
-import static 
org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextExtra;
-import static 
org.apache.beam.sdk.nexmark.sources.utils.StringsGenerator.nextString;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.beam.sdk.nexmark.model.Person;
-import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
-
-/**
- * Generates people.
- */
-public class PersonGenerator {
-  /**
-   * Number of yet-to-be-created people and auction ids allowed.
-   */
-  private static final int PERSON_ID_LEAD = 10;
-
-  /**
-   * Keep the number of states small so that the example queries will find 
results even with
-   * a small batch of events.
-   */
-  private static final List<String> US_STATES = 
Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(","));
-
-  private static final List<String> US_CITIES =
-      Arrays.asList(
-          ("Phoenix,Los Angeles,San 
Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne")
-              .split(","));
-
-  private static final List<String> FIRST_NAMES =
-      
Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(","));
-
-  private static final List<String> LAST_NAMES =
-      
Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(","));
-
-
-  /**
-   * Generate and return a random person with next available id.
-   */
-  public static Person nextPerson(
-      long nextEventId, Random random, long timestamp, GeneratorConfig config) 
{
-
-    long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID;
-    String name = nextPersonName(random);
-    String email = nextEmail(random);
-    String creditCard = nextCreditCard(random);
-    String city = nextUSCity(random);
-    String state = nextUSState(random);
-    int currentSize =
-        8 + name.length() + email.length() + creditCard.length() + 
city.length() + state.length();
-    String extra = nextExtra(random, currentSize, 
config.getAvgPersonByteSize());
-    return new Person(id, name, email, creditCard, city, state, timestamp, 
extra);
-  }
-
-  /**
-   * Return a random person id (base 0).
-   */
-  public static long nextBase0PersonId(long eventId, Random random, 
GeneratorConfig config) {
-    // Choose a random person from any of the 'active' people, plus a few 
'leads'.
-    // By limiting to 'active' we ensure the density of bids or auctions per 
person
-    // does not decrease over time for long running jobs.
-    // By choosing a person id ahead of the last valid person id we will make
-    // newPerson and newAuction events appear to have been swapped in time.
-    long numPeople = lastBase0PersonId(eventId) + 1;
-    long activePeople = Math.min(numPeople, config.getNumActivePeople());
-    long n = nextLong(random, activePeople + PERSON_ID_LEAD);
-    return numPeople - activePeople + n;
-  }
-
-  /**
-   * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the 
current person id if
-   * due to generate a person.
-   */
-  public static long lastBase0PersonId(long eventId) {
-    long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR;
-    long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR;
-    if (offset >= GeneratorConfig.PERSON_PROPORTION) {
-      // About to generate an auction or bid.
-      // Go back to the last person generated in this epoch.
-      offset = GeneratorConfig.PERSON_PROPORTION - 1;
-    }
-    // About to generate a person.
-    return epoch * GeneratorConfig.PERSON_PROPORTION + offset;
-  }
-
-
-  /** return a random US state. */
-  private static String nextUSState(Random random) {
-    return US_STATES.get(random.nextInt(US_STATES.size()));
-  }
-
-  /** Return a random US city. */
-  private static String nextUSCity(Random random) {
-    return US_CITIES.get(random.nextInt(US_CITIES.size()));
-  }
-
-  /** Return a random person name. */
-  private static String nextPersonName(Random random) {
-    return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " "
-        + LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
-  }
-
-  /** Return a random email address. */
-  private static String nextEmail(Random random) {
-    return nextString(random, 7) + "@" + nextString(random, 5) + ".com";
-  }
-
-  /** Return a random credit card number. */
-  private static String nextCreditCard(Random random) {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < 4; i++) {
-      if (i > 0) {
-        sb.append(' ');
-      }
-      sb.append(String.format("%04d", random.nextInt(10000)));
-    }
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
deleted file mode 100644
index 9dae1ca..0000000
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/PriceGenerator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources.utils;
-
-import java.util.Random;
-
-/**
- * Generates a random price.
- */
-public class PriceGenerator {
-
-  /** Return a random price. */
-  public static long nextPrice(Random random) {
-    return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java
deleted file mode 100644
index 4e69a9d..0000000
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/StringsGenerator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.nexmark.sources.utils;
-
-import java.util.Random;
-
-/**
- * Generates strings which are used for different field in other model objects.
- */
-public class StringsGenerator {
-
-  /** Smallest random string size. */
-  private static final int MIN_STRING_LENGTH = 3;
-
-  /** Return a random string of up to {@code maxLength}. */
-  public static String nextString(Random random, int maxLength) {
-    int len = MIN_STRING_LENGTH + random.nextInt(maxLength - 
MIN_STRING_LENGTH);
-    StringBuilder sb = new StringBuilder();
-    while (len-- > 0) {
-      if (random.nextInt(13) == 0) {
-        sb.append(' ');
-      } else {
-        sb.append((char) ('a' + random.nextInt(26)));
-      }
-    }
-    return sb.toString().trim();
-  }
-
-  /** Return a random string of exactly {@code length}. */
-  public static String nextExactString(Random random, int length) {
-    StringBuilder sb = new StringBuilder();
-    while (length-- > 0) {
-      sb.append((char) ('a' + random.nextInt(26)));
-    }
-    return sb.toString();
-  }
-
-  /**
-   * Return a random {@code string} such that {@code currentSize + 
string.length()} is on average
-   * {@code averageSize}.
-   */
-  public static String nextExtra(Random random, int currentSize, int 
desiredAverageSize) {
-    if (currentSize > desiredAverageSize) {
-      return "";
-    }
-    desiredAverageSize -= currentSize;
-    int delta = (int) Math.round(desiredAverageSize * 0.2);
-    int minSize = desiredAverageSize - delta;
-    int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta));
-    return nextExactString(random, desiredSize);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java
deleted file mode 100644
index e09564a..0000000
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/utils/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Utility classes for Generator.
- */
-package org.apache.beam.sdk.nexmark.sources.utils;

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
 
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
index 3590d64..beef314 100644
--- 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
+++ 
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.nexmark.sources;
 
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.NexmarkOptions;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
 
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
index 9553d22..fbb8136 100644
--- 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
+++ 
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;

http://git-wip-us.apache.org/repos/asf/beam/blob/d8a6fad9/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
 
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
index c00d1a3..5c9bf5f 100644
--- 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
+++ 
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
@@ -30,6 +30,9 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.sources.generator.Generator;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorCheckpoint;
+import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.junit.Test;

Reply via email to