[Nexmark] Extract GeneratorCheckpoint into a separate class. Move getNextEvent() call to the top of the stack.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fce6401 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fce6401 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fce6401 Branch: refs/heads/master Commit: 4fce6401fa1abb91c0e493791c29fa0f2cefe0d3 Parents: 1b3f1c1 Author: Anton Kedin <[email protected]> Authored: Mon Nov 6 14:47:38 2017 -0800 Committer: Anton Kedin <[email protected]> Committed: Wed Nov 15 13:48:37 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/nexmark/sources/Generator.java | 145 ++++++------------- .../nexmark/sources/GeneratorCheckpoint.java | 78 ++++++++++ .../sdk/nexmark/sources/GeneratorConfig.java | 41 ++++++ .../nexmark/sources/UnboundedEventSource.java | 20 +-- .../sources/UnboundedEventSourceTest.java | 3 +- 5 files changed, 179 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java index c368d72..630d0b5 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/Generator.java @@ -19,20 +19,13 @@ package org.apache.beam.sdk.nexmark.sources; import static com.google.common.base.Preconditions.checkNotNull; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.io.Serializable; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Random; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.io.UnboundedSource; + import org.apache.beam.sdk.nexmark.model.Auction; import org.apache.beam.sdk.nexmark.model.Bid; import org.apache.beam.sdk.nexmark.model.Event; @@ -95,55 +88,6 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl private static final int HOT_BIDDER_RATIO = 100; /** - * Just enough state to be able to restore a generator back to where it was checkpointed. - */ - public static class Checkpoint implements UnboundedSource.CheckpointMark { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - - /** Coder for this class. */ - public static final Coder<Checkpoint> CODER_INSTANCE = - new CustomCoder<Checkpoint>() { - @Override public void encode(Checkpoint value, OutputStream outStream) - throws CoderException, IOException { - LONG_CODER.encode(value.numEvents, outStream); - LONG_CODER.encode(value.wallclockBaseTime, outStream); - } - - @Override - public Checkpoint decode(InputStream inStream) - throws CoderException, IOException { - long numEvents = LONG_CODER.decode(inStream); - long wallclockBaseTime = LONG_CODER.decode(inStream); - return new Checkpoint(numEvents, wallclockBaseTime); - } - @Override public void verifyDeterministic() throws NonDeterministicException {} - }; - - private final long numEvents; - private final long wallclockBaseTime; - - private Checkpoint(long numEvents, long wallclockBaseTime) { - this.numEvents = numEvents; - this.wallclockBaseTime = wallclockBaseTime; - } - - public Generator toGenerator(GeneratorConfig config) { - return new Generator(config, numEvents, wallclockBaseTime); - } - - @Override - public void finalizeCheckpoint() throws IOException { - // Nothing to finalize. - } - - @Override - public String toString() { - return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}", - numEvents, wallclockBaseTime); - } - } - - /** * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then * (arbitrary but stable) event hash order. */ @@ -213,17 +157,17 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl private GeneratorConfig config; /** Number of events generated by this generator. */ - private long numEvents; + private long eventsCountSoFar; /** * Wallclock time at which we emitted the first event (ms since epoch). Initially -1. */ private long wallclockBaseTime; - private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) { + Generator(GeneratorConfig config, long eventsCountSoFar, long wallclockBaseTime) { checkNotNull(config); this.config = config; - this.numEvents = numEvents; + this.eventsCountSoFar = eventsCountSoFar; this.wallclockBaseTime = wallclockBaseTime; } @@ -237,8 +181,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl /** * Return a checkpoint for the current generator. */ - public Checkpoint toCheckpoint() { - return new Checkpoint(numEvents, wallclockBaseTime); + public GeneratorCheckpoint toCheckpoint() { + return new GeneratorCheckpoint(eventsCountSoFar, wallclockBaseTime); } /** @@ -246,7 +190,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl */ public Generator copy() { checkNotNull(config); - Generator result = new Generator(config, numEvents, wallclockBaseTime); + Generator result = new Generator(config, eventsCountSoFar, wallclockBaseTime); return result; } @@ -276,15 +220,14 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl * help with bookkeeping. */ public long getNextEventId() { - return config.firstEventId + config.nextAdjustedEventNumber(numEvents); + return config.firstEventId + config.nextAdjustedEventNumber(eventsCountSoFar); } /** * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if * due to generate a person. */ - private long lastBase0PersonId() { - long eventId = getNextEventId(); + private long lastBase0PersonId(long eventId) { long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; if (offset >= GeneratorConfig.PERSON_PROPORTION) { @@ -300,8 +243,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if * due to generate an auction. */ - private long lastBase0AuctionId() { - long eventId = getNextEventId(); + private long lastBase0AuctionId(long eventId) { long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; if (offset < GeneratorConfig.PERSON_PROPORTION) { @@ -384,7 +326,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl /** Return a random time delay, in milliseconds, for length of auctions. */ private long nextAuctionLengthMs(Random random, long timestamp) { // What's our current event number? - long currentEventNumber = config.nextAdjustedEventNumber(numEvents); + long currentEventNumber = config.nextAdjustedEventNumber(eventsCountSoFar); // How many events till we've generated numInFlightAuctions? long numEventsForAuctions = (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR) @@ -428,8 +370,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl /** * Generate and return a random person with next available id. */ - private Person nextPerson(Random random, long timestamp) { - long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID; + private Person nextPerson(long nextEventId, Random random, long timestamp) { + long id = lastBase0PersonId(nextEventId) + GeneratorConfig.FIRST_PERSON_ID; String name = nextPersonName(random); String email = nextEmail(random); String creditCard = nextCreditCard(random); @@ -444,13 +386,13 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl /** * Return a random person id (base 0). */ - private long nextBase0PersonId(Random random) { + private long nextBase0PersonId(long eventId, Random random) { // Choose a random person from any of the 'active' people, plus a few 'leads'. // By limiting to 'active' we ensure the density of bids or auctions per person // does not decrease over time for long running jobs. // By choosing a person id ahead of the last valid person id we will make // newPerson and newAuction events appear to have been swapped in time. - long numPeople = lastBase0PersonId() + 1; + long numPeople = lastBase0PersonId(eventId) + 1; long activePeople = Math.min(numPeople, config.configuration.numActivePeople); long n = nextLong(random, activePeople + PERSON_ID_LEAD); return numPeople - activePeople + n; @@ -459,29 +401,30 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl /** * Return a random auction id (base 0). */ - private long nextBase0AuctionId(Random random) { + private long nextBase0AuctionId(long nextEventId, Random random) { // Choose a random auction for any of those which are likely to still be in flight, // plus a few 'leads'. // Note that ideally we'd track non-expired auctions exactly, but that state // is difficult to split. - long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0); - long maxAuction = lastBase0AuctionId(); + long minAuction = Math.max( + lastBase0AuctionId(nextEventId) - config.configuration.numInFlightAuctions, 0); + long maxAuction = lastBase0AuctionId(nextEventId); return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD); } /** * Generate and return a random auction with next available id. */ - private Auction nextAuction(Random random, long timestamp) { - long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID; + private Auction nextAuction(long eventId, Random random, long timestamp) { + long id = lastBase0AuctionId(eventId) + GeneratorConfig.FIRST_AUCTION_ID; long seller; // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio. if (random.nextInt(config.configuration.hotSellersRatio) > 0) { // Choose the first person in the batch of last HOT_SELLER_RATIO people. - seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO; + seller = (lastBase0PersonId(eventId) / HOT_SELLER_RATIO) * HOT_SELLER_RATIO; } else { - seller = nextBase0PersonId(random); + seller = nextBase0PersonId(eventId, random); } seller += GeneratorConfig.FIRST_PERSON_ID; @@ -500,14 +443,14 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl /** * Generate and return a random bid with next available id. */ - private Bid nextBid(Random random, long timestamp) { + private Bid nextBid(long eventId, Random random, long timestamp) { long auction; // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio. if (random.nextInt(config.configuration.hotAuctionRatio) > 0) { // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions. - auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO; + auction = (lastBase0AuctionId(eventId) / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO; } else { - auction = nextBase0AuctionId(random); + auction = nextBase0AuctionId(eventId, random); } auction += GeneratorConfig.FIRST_AUCTION_ID; @@ -516,9 +459,9 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl if (random.nextInt(config.configuration.hotBiddersRatio) > 0) { // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of // last HOT_BIDDER_RATIO people. - bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1; + bidder = (lastBase0PersonId(getNextEventId()) / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1; } else { - bidder = nextBase0PersonId(random); + bidder = nextBase0PersonId(eventId, random); } bidder += GeneratorConfig.FIRST_PERSON_ID; @@ -530,7 +473,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl @Override public boolean hasNext() { - return numEvents < config.maxEvents; + return eventsCountSoFar < config.maxEvents; } /** @@ -544,34 +487,40 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl } // When, in event time, we should generate the event. Monotonic. long eventTimestamp = - config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey(); + config.timestampAndInterEventDelayUsForEvent( + config.nextEventNumber(eventsCountSoFar)).getKey(); // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize // may have local jitter. long adjustedEventTimestamp = - config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents)) + config.timestampAndInterEventDelayUsForEvent( + config.nextAdjustedEventNumber(eventsCountSoFar)) .getKey(); // The minimum of this and all future adjusted event timestamps. Accounts for jitter in // the event timestamp. long watermark = - config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents)) + config.timestampAndInterEventDelayUsForEvent( + config.nextEventNumberForWatermark(eventsCountSoFar)) .getKey(); // When, in wallclock time, we should emit the event. long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime); // Seed the random number generator with the next 'event id'. Random random = new Random(getNextEventId()); - long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR; + + + long newEventId = getNextEventId(); + long rem = newEventId % GeneratorConfig.PROPORTION_DENOMINATOR; Event event; if (rem < GeneratorConfig.PERSON_PROPORTION) { - event = new Event(nextPerson(random, adjustedEventTimestamp)); + event = new Event(nextPerson(newEventId, random, adjustedEventTimestamp)); } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { - event = new Event(nextAuction(random, adjustedEventTimestamp)); + event = new Event(nextAuction(newEventId, random, adjustedEventTimestamp)); } else { - event = new Event(nextBid(random, adjustedEventTimestamp)); + event = new Event(nextBid(newEventId, random, adjustedEventTimestamp)); } - numEvents++; + eventsCountSoFar++; return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark); } @@ -590,7 +539,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl * Return how many microseconds till we emit the next event. */ public long currentInterEventDelayUs() { - return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)) + return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(eventsCountSoFar)) .getValue(); } @@ -598,12 +547,12 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl * Return an estimate of fraction of output consumed. */ public double getFractionConsumed() { - return (double) numEvents / config.maxEvents; + return (double) eventsCountSoFar / config.maxEvents; } @Override public String toString() { - return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config, - numEvents, wallclockBaseTime); + return String.format("Generator{config:%s; eventsCountSoFar:%d; wallclockBaseTime:%d}", config, + eventsCountSoFar, wallclockBaseTime); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java new file mode 100644 index 0000000..dfc135d --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorCheckpoint.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.nexmark.sources; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.UnboundedSource; + +/** + * Just enough state to be able to restore a generator back to where it was checkpointed. + */ +public class GeneratorCheckpoint implements UnboundedSource.CheckpointMark { + private static final Coder<Long> LONG_CODER = VarLongCoder.of(); + + /** Coder for this class. */ + public static final Coder<GeneratorCheckpoint> CODER_INSTANCE = + new CustomCoder<GeneratorCheckpoint>() { + @Override public void encode(GeneratorCheckpoint value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.numEvents, outStream); + LONG_CODER.encode(value.wallclockBaseTime, outStream); + } + + @Override + public GeneratorCheckpoint decode(InputStream inStream) + throws CoderException, IOException { + long numEvents = LONG_CODER.decode(inStream); + long wallclockBaseTime = LONG_CODER.decode(inStream); + return new GeneratorCheckpoint(numEvents, wallclockBaseTime); + } + @Override public void verifyDeterministic() throws NonDeterministicException {} + }; + + private final long numEvents; + private final long wallclockBaseTime; + + GeneratorCheckpoint(long numEvents, long wallclockBaseTime) { + this.numEvents = numEvents; + this.wallclockBaseTime = wallclockBaseTime; + } + + public Generator toGenerator(GeneratorConfig config) { + return new Generator(config, numEvents, wallclockBaseTime); + } + + @Override + public void finalizeCheckpoint() throws IOException { + // Nothing to finalize. + } + + @Override + public String toString() { + return String.format("Generator.GeneratorCheckpoint{numEvents:%d;wallclockBaseTime:%d}", + numEvents, wallclockBaseTime); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java index 42183c6..8e0a899 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/GeneratorConfig.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.nexmark.sources; import java.io.Serializable; import java.util.ArrayList; import java.util.List; + import org.apache.beam.sdk.nexmark.NexmarkConfiguration; import org.apache.beam.sdk.nexmark.model.Event; import org.apache.beam.sdk.values.KV; @@ -186,6 +187,46 @@ public class GeneratorConfig implements Serializable { + numBids * configuration.avgBidByteSize; } + public int getAvgPersonByteSize() { + return configuration.avgPersonByteSize; + } + + public int getNumActivePeople() { + return configuration.numActivePeople; + } + + public int getHotSellersRatio() { + return configuration.hotSellersRatio; + } + + public int getNumInFlightAuctions() { + return configuration.numInFlightAuctions; + } + + public int getHotAuctionRatio() { + return configuration.hotAuctionRatio; + } + + public int getHotBiddersRatio() { + return configuration.hotBiddersRatio; + } + + public int getAvgBidByteSize() { + return configuration.avgBidByteSize; + } + + public int getAvgAuctionByteSize() { + return configuration.avgAuctionByteSize; + } + + public double getProbDelayedEvent() { + return configuration.probDelayedEvent; + } + + public long getOccasionalDelaySec() { + return configuration.occasionalDelaySec; + } + /** * Return an estimate of the byte-size of all events a generator for this config would yield. */ http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java index 8f5575c..74eb061 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java @@ -23,7 +23,9 @@ import java.util.NoSuchElementException; import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.ThreadLocalRandom; + import javax.annotation.Nullable; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.nexmark.NexmarkUtils; @@ -43,7 +45,7 @@ import org.slf4j.LoggerFactory; * that the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise, * events are returned every time the system asks for one. */ -public class UnboundedEventSource extends UnboundedSource<Event, Generator.Checkpoint> { +public class UnboundedEventSource extends UnboundedSource<Event, GeneratorCheckpoint> { private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30); private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class); @@ -161,12 +163,12 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check watermark - next.eventTimestamp); } else if (generator.hasNext()) { next = generator.nextEvent(); - if (isRateLimited && config.configuration.probDelayedEvent > 0.0 - && config.configuration.occasionalDelaySec > 0 - && ThreadLocalRandom.current().nextDouble() < config.configuration.probDelayedEvent) { + if (isRateLimited && config.getProbDelayedEvent() > 0.0 + && config.getOccasionalDelaySec() > 0 + && ThreadLocalRandom.current().nextDouble() < config.getProbDelayedEvent()) { // We'll hold back this event and go around again. long delayMs = - ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 1000) + ThreadLocalRandom.current().nextLong(config.getOccasionalDelaySec() * 1000) + 1L; LOG.debug("delaying event by {}ms", delayMs); heldBackEvents.add(next.withDelay(delayMs)); @@ -265,7 +267,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check } @Override - public Generator.Checkpoint getCheckpointMark() { + public GeneratorCheckpoint getCheckpointMark() { return generator.toCheckpoint(); } @@ -283,8 +285,8 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check } @Override - public Coder<Generator.Checkpoint> getCheckpointMarkCoder() { - return Generator.Checkpoint.CODER_INSTANCE; + public Coder<GeneratorCheckpoint> getCheckpointMarkCoder() { + return GeneratorCheckpoint.CODER_INSTANCE; } @Override @@ -301,7 +303,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check @Override public EventReader createReader( - PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) { + PipelineOptions options, @Nullable GeneratorCheckpoint checkpoint) { if (checkpoint == null) { LOG.trace("creating initial unbounded reader for {}", config); return new EventReader(config); http://git-wip-us.apache.org/repos/asf/beam/blob/4fce6401/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java index 3853ede..c00d1a3 100644 --- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.Random; import java.util.Set; + import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.nexmark.NexmarkConfiguration; @@ -97,7 +98,7 @@ public class UnboundedEventSourceTest { n -= m; System.out.printf("splitting with %d remaining...%n", n); CheckpointMark checkpointMark = reader.getCheckpointMark(); - reader = source.createReader(options, (Generator.Checkpoint) checkpointMark); + reader = source.createReader(options, (GeneratorCheckpoint) checkpointMark); } assertFalse(reader.advance());
