Fix and improve query3 and query12 query3: Use GlobalWindow to comply with the State/Timer APIs (issue #7). Use timer for personState expiration in GlobalWindow (issue #29). Add trigger to GlobalWindow
query12: Replace Count.perKey by Count.perElement (issue #34) Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c28b492 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c28b492 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c28b492 Branch: refs/heads/master Commit: 7c28b492aa17160d9a84914814e618716b7beb9f Parents: bd93c8b Author: Etienne Chauchot <[email protected]> Authored: Mon Apr 3 15:18:04 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:27 2017 +0200 ---------------------------------------------------------------------- .../nexmark/NexmarkConfiguration.java | 19 +- .../integration/nexmark/NexmarkOptions.java | 7 + .../integration/nexmark/queries/Query12.java | 19 +- .../integration/nexmark/queries/Query3.java | 263 +++++++++++-------- .../integration/nexmark/queries/QueryTest.java | 4 + 5 files changed, 195 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java index e2890ed..d6cd808 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java @@ -195,6 +195,13 @@ public class NexmarkConfiguration implements Serializable { public int fanout = 5; /** + * Maximum waiting time to clean personState in query3 + * (ie maximum waiting of the auctions related to person in state in seconds in event time). + */ + @JsonProperty + public int maxAuctionsWaitingTime = 600; + + /** * Length of occasional delay to impose on events (in seconds). */ @JsonProperty @@ -322,6 +329,9 @@ public class NexmarkConfiguration implements Serializable { if (options.getFanout() != null) { fanout = options.getFanout(); } + if (options.getMaxAuctionsWaitingTime() != null) { + fanout = options.getMaxAuctionsWaitingTime(); + } if (options.getOccasionalDelaySec() != null) { occasionalDelaySec = options.getOccasionalDelaySec(); } @@ -376,6 +386,7 @@ public class NexmarkConfiguration implements Serializable { result.diskBusyBytes = diskBusyBytes; result.auctionSkip = auctionSkip; result.fanout = fanout; + result.maxAuctionsWaitingTime = maxAuctionsWaitingTime; result.occasionalDelaySec = occasionalDelaySec; result.probDelayedEvent = probDelayedEvent; result.maxLogEvents = maxLogEvents; @@ -479,6 +490,9 @@ public class NexmarkConfiguration implements Serializable { if (fanout != DEFAULT.fanout) { sb.append(String.format("; fanout:%d", fanout)); } + if (maxAuctionsWaitingTime != DEFAULT.maxAuctionsWaitingTime) { + sb.append(String.format("; maxAuctionsWaitingTime:%d", fanout)); + } if (occasionalDelaySec != DEFAULT.occasionalDelaySec) { sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec)); } @@ -527,7 +541,7 @@ public class NexmarkConfiguration implements Serializable { ratePeriodSec, preloadSeconds, isRateLimited, useWallclockEventTime, avgPersonByteSize, avgAuctionByteSize, avgBidByteSize, hotAuctionRatio, hotSellersRatio, hotBiddersRatio, windowSizeSec, windowPeriodSec, watermarkHoldbackSec, numInFlightAuctions, numActivePeople, - coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout, + coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout, maxAuctionsWaitingTime, occasionalDelaySec, probDelayedEvent, maxLogEvents, usePubsubPublishTime, outOfOrderGroupSize); } @@ -571,6 +585,9 @@ public class NexmarkConfiguration implements Serializable { if (fanout != other.fanout) { return false; } + if (maxAuctionsWaitingTime != other.maxAuctionsWaitingTime) { + return false; + } if (firstEventRate != other.firstEventRate) { return false; } http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java index 1be974f..e39f0a4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java @@ -309,6 +309,13 @@ public interface NexmarkOptions extends PubsubOptions { void setFanout(Integer fanout); + @Description("Maximum waiting time to clean personState in query3 " + + "(ie maximum waiting of the auctions related to person in state in seconds in event time).") + @Nullable + Integer getMaxAuctionsWaitingTime(); + + void setMaxAuctionsWaitingTime(Integer fanout); + @Description("Length of occasional delay to impose on events (in seconds).") @Nullable Long getOccasionalDelaySec(); http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java index c67401b..a5db504 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java @@ -49,16 +49,13 @@ public class Query12 extends NexmarkQuery { private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) { return events .apply(JUST_BIDS) - .apply(name + ".Rekey", - // TODO etienne: why not avoid this ParDo and do a Cont.perElement? - ParDo.of(new DoFn<Bid, KV<Long, Void>>() { - @ProcessElement - public void processElement(ProcessContext c) { - Bid bid = c.element(); - c.output(KV.of(bid.bidder, (Void) null)); - } - })) - .apply(Window.<KV<Long, Void>>into(new GlobalWindows()) + .apply(ParDo.of(new DoFn<Bid, Long>() { + @ProcessElement + public void processElement(ProcessContext c){ + c.output(c.element().bidder); + } + })) + .apply(Window.<Long>into(new GlobalWindows()) .triggering( Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane() @@ -66,7 +63,7 @@ public class Query12 extends NexmarkQuery { Duration.standardSeconds(configuration.windowSizeSec)))) .discardingFiredPanes() .withAllowedLateness(Duration.ZERO)) - .apply(Count.<Long, Void>perKey()) + .apply(Count.<Long>perElement()) .apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() { @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java index 128c2b7..ba31e9f 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java @@ -39,14 +39,21 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,31 +69,141 @@ import org.slf4j.LoggerFactory; * </pre> * * <p>We'll implement this query to allow 'new auction' events to come before the 'new person' - * events for the auction seller. Those auctions will be stored until the matching person is - * seen. Then all subsequent auctions for a person will use the stored person record. + * events for the auction seller. Those auctions will be stored until the matching person is seen. + * Then all subsequent auctions for a person will use the stored person record. * * <p>A real system would use an external system to maintain the id-to-person association. */ public class Query3 extends NexmarkQuery { + private static final Logger LOG = LoggerFactory.getLogger(Query3.class); -// private static final StateContext GLOBAL_NAMESPACE = StateContexts.global(); - private static final StateSpec<Object, ValueState<List<Auction>>> AUCTION_LIST_CODED_TAG = - StateSpecs.value(ListCoder.of(Auction.CODER)); - private static final StateSpec<Object, ValueState<Person>> PERSON_CODED_TAG = - StateSpecs.value(Person.CODER); + private final JoinDoFn joinDoFn; + + public Query3(NexmarkConfiguration configuration) { + super(configuration, "Query3"); + joinDoFn = new JoinDoFn(configuration.maxAuctionsWaitingTime); + + } + + @Override + @Nullable + public Aggregator<Long, Long> getFatalCount() { + return joinDoFn.fatalCounter; + } + + private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) { + int numEventsInPane = 30; + + PCollection<Event> eventsWindowed = + events.apply( + Window.<Event>into(new GlobalWindows()) + .triggering(Repeatedly.forever((AfterPane.elementCountAtLeast(numEventsInPane)))) + .discardingFiredPanes() + .withAllowedLateness(Duration.ZERO)); + PCollection<KV<Long, Auction>> auctionsBySellerId = + eventsWindowed + // Only want the new auction events. + .apply(JUST_NEW_AUCTIONS) + + // We only want auctions in category 10. + .apply( + name + ".InCategory", + Filter.by( + new SerializableFunction<Auction, Boolean>() { + + @Override + public Boolean apply(Auction auction) { + return auction.category == 10; + } + })) + + // Key auctions by their seller id. + .apply("AuctionBySeller", AUCTION_BY_SELLER); + + PCollection<KV<Long, Person>> personsById = + eventsWindowed + // Only want the new people events. + .apply(JUST_NEW_PERSONS) + + // We only want people in OR, ID, CA. + .apply( + name + ".InState", + Filter.by( + new SerializableFunction<Person, Boolean>() { + + @Override + public Boolean apply(Person person) { + return person.state.equals("OR") + || person.state.equals("ID") + || person.state.equals("CA"); + } + })) + + // Key people by their id. + .apply("PersonById", PERSON_BY_ID); + + return + // Join auctions and people. + // concatenate KeyedPCollections + KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId) + .and(PERSON_TAG, personsById) + // group auctions and persons by personId + .apply(CoGroupByKey.<Long>create()) + .apply(name + ".Join", ParDo.of(joinDoFn)) + + // Project what we want. + .apply( + name + ".Project", + ParDo.of( + new DoFn<KV<Auction, Person>, NameCityStateId>() { + + @ProcessElement + public void processElement(ProcessContext c) { + Auction auction = c.element().getKey(); + Person person = c.element().getValue(); + c.output( + new NameCityStateId(person.name, person.city, person.state, auction.id)); + } + })); + } + + @Override + protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { + return NexmarkUtils.castToKnownSize(name, applyTyped(events)); + } /** - * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair - * at a time. + * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair at + * a time. * * <p>We know a person may submit any number of auctions. Thus new person event must have the * person record stored in persistent state in order to match future auctions by that person. * - * <p>However we know that each auction is associated with at most one person, so only need - * to store auction records in persistent state until we have seen the corresponding person - * record. And of course may have already seen that record. + * <p>However we know that each auction is associated with at most one person, so only need to + * store auction records in persistent state until we have seen the corresponding person record. + * And of course may have already seen that record. */ private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> { + + private int maxAuctionsWaitingTime; + private static final String AUCTIONS = "auctions"; + private static final String PERSON = "person"; + + @StateId(PERSON) + private static final StateSpec<Object, ValueState<Person>> personSpec = + StateSpecs.value(Person.CODER); + + private static final String PERSON_STATE_EXPIRING = "personStateExpiring"; + + public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", Sum.ofLongs()); + + @StateId(AUCTIONS) + private final StateSpec<Object, ValueState<List<Auction>>> auctionsSpec = + StateSpecs.value(ListCoder.of(Auction.CODER)); + + @TimerId(PERSON_STATE_EXPIRING) + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + private final Aggregator<Long, Long> newAuctionCounter = createAggregator("newAuction", Sum.ofLongs()); private final Aggregator<Long, Long> newPersonCounter = @@ -97,20 +214,25 @@ public class Query3 extends NexmarkQuery { createAggregator("newOldOutput", Sum.ofLongs()); private final Aggregator<Long, Long> oldNewOutputCounter = createAggregator("oldNewOutput", Sum.ofLongs()); - public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", Sum.ofLongs()); + + private JoinDoFn(int maxAuctionsWaitingTime) { + this.maxAuctionsWaitingTime = maxAuctionsWaitingTime; + } @ProcessElement - public void processElement(ProcessContext c) throws IOException { - //TODO: This is using the internal state API. Rework to use the - //TODO Ismael this is broken for not access to state + public void processElement( + ProcessContext c, + @TimerId(PERSON_STATE_EXPIRING) Timer timer, + @StateId(PERSON) ValueState<Person> personState, + @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) + throws IOException { // We would *almost* implement this by rewindowing into the global window and // running a combiner over the result. The combiner's accumulator would be the // state we use below. However, combiners cannot emit intermediate results, thus // we need to wait for the pending ReduceFn API. -// StateInternals<?> stateInternals = c.windowingInternals().stateInternals(); -// ValueState<Person> personState = stateInternals.state(GLOBAL_NAMESPACE, PERSON_CODED_TAG); -// Person existingPerson = personState.read(); - Person existingPerson = null; + + Person existingPerson = personState.read(); + if (existingPerson != null) { // We've already seen the new person event for this person id. // We can join with any new auctions on-the-fly without needing any @@ -123,8 +245,6 @@ public class Query3 extends NexmarkQuery { return; } -// ValueState<List<Auction>> auctionsState = -// stateInternals.state(GLOBAL_NAMESPACE, AUCTION_LIST_CODED_TAG); Person theNewPerson = null; for (Person newPerson : c.element().getValue().getAll(PERSON_TAG)) { if (theNewPerson == null) { @@ -140,14 +260,14 @@ public class Query3 extends NexmarkQuery { } newPersonCounter.addValue(1L); // We've now seen the person for this person id so can flush any - // pending auctions for the same seller id. - List<Auction> pendingAuctions = null; //auctionsState.read(); + // pending auctions for the same seller id (an auction is done by only one seller). + List<Auction> pendingAuctions = auctionsState.read(); if (pendingAuctions != null) { for (Auction pendingAuction : pendingAuctions) { oldNewOutputCounter.addValue(1L); c.output(KV.of(pendingAuction, newPerson)); } -// auctionsState.clear(); + auctionsState.clear(); } // Also deal with any new auctions. for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) { @@ -156,8 +276,11 @@ public class Query3 extends NexmarkQuery { c.output(KV.of(newAuction, newPerson)); } // Remember this person for any future auctions. - -// personState.write(newPerson); + personState.write(newPerson); + //set a time out to clear this state + Instant firingTime = new Instant(newPerson.dateTime) + .plus(Duration.standardSeconds(maxAuctionsWaitingTime)); + timer.set(firingTime); } if (theNewPerson != null) { return; @@ -165,7 +288,7 @@ public class Query3 extends NexmarkQuery { // We'll need to remember the auctions until we see the corresponding // new person event. - List<Auction> pendingAuctions = null; //auctionsState.read(); + List<Auction> pendingAuctions = auctionsState.read(); if (pendingAuctions == null) { pendingAuctions = new ArrayList<>(); } @@ -173,84 +296,14 @@ public class Query3 extends NexmarkQuery { newAuctionCounter.addValue(1L); pendingAuctions.add(newAuction); } -// auctionsState.write(pendingAuctions); + auctionsState.write(pendingAuctions); } + @OnTimer(PERSON_STATE_EXPIRING) + public void onTimerCallback( + OnTimerContext context, + @StateId(PERSON) ValueState<Person> personState) { + personState.clear(); } - private final JoinDoFn joinDoFn = new JoinDoFn(); - - public Query3(NexmarkConfiguration configuration) { - super(configuration, "Query3"); - } - - @Override - @Nullable - public Aggregator<Long, Long> getFatalCount() { - return joinDoFn.fatalCounter; - } - - private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) { - // Batch into incremental results windows. - events = events.apply( - Window.<Event>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))); - - PCollection<KV<Long, Auction>> auctionsBySellerId = - events - // Only want the new auction events. - .apply(JUST_NEW_AUCTIONS) - - // We only want auctions in category 10. - .apply(name + ".InCategory", Filter.by(new SerializableFunction<Auction, Boolean>() { - @Override - public Boolean apply(Auction auction) { - return auction.category == 10; - } - })) - - // Key auctions by their seller id. - .apply("AuctionBySeller", AUCTION_BY_SELLER); - - PCollection<KV<Long, Person>> personsById = - events - // Only want the new people events. - .apply(JUST_NEW_PERSONS) - - // We only want people in OR, ID, CA. - .apply(name + ".InState", Filter.by(new SerializableFunction<Person, Boolean>() { - @Override - public Boolean apply(Person person) { - return person.state.equals("OR") || person.state.equals("ID") - || person.state.equals("CA"); - } - })) - - // Key people by their id. - .apply("PersonById", PERSON_BY_ID); - - return - // Join auctions and people. - // concatenate KeyedPCollections - KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId) - .and(PERSON_TAG, personsById) - // group auctions and persons by personId - .apply(CoGroupByKey.<Long>create()) - .apply(name + ".Join", ParDo.of(joinDoFn)) - - // Project what we want. - .apply(name + ".Project", - ParDo.of(new DoFn<KV<Auction, Person>, NameCityStateId>() { - @ProcessElement - public void processElement(ProcessContext c) { - Auction auction = c.element().getKey(); - Person person = c.element().getValue(); - c.output(new NameCityStateId( - person.name, person.city, person.state, auction.id)); - } - })); - } - - @Override - protected PCollection<KnownSize> applyPrim(PCollection<Event> events) { - return NexmarkUtils.castToKnownSize(name, applyTyped(events)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java index 5cf4287..dca2887 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java @@ -25,10 +25,13 @@ import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesStatefulParDo; +import org.apache.beam.sdk.testing.UsesTimersInParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -95,6 +98,7 @@ public class QueryTest { } @Test + @Category({UsesStatefulParDo.class, UsesTimersInParDo.class}) public void query7MatchesModel() { queryMatchesModel("Query7Test", new Query7(CONFIG), new Query7Model(CONFIG)); }
