Improve query5, query10 and query11 query5: Add comment on key lifting (issue #30)
query10: Add comment for strange groupByKey (issue #31) query11: Replace Count.perKey by Count.perElement (issue #32) Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7bfc982c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7bfc982c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7bfc982c Branch: refs/heads/master Commit: 7bfc982c77de52f49ba1b304a81bb0d53de5f44a Parents: a7f9f7d Author: Etienne Chauchot <[email protected]> Authored: Fri Mar 24 14:29:08 2017 +0100 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:27 2017 +0200 ---------------------------------------------------------------------- .../integration/nexmark/queries/Query10.java | 3 +- .../integration/nexmark/queries/Query11.java | 47 ++++++++++---------- .../integration/nexmark/queries/Query5.java | 2 + 3 files changed, 27 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7bfc982c/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java index 6912ed1..5246427 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java @@ -322,8 +322,7 @@ public class Query10 extends NexmarkQuery { // We expect no late data here, but we'll assume the worst so we can detect any. .withAllowedLateness(Duration.standardDays(1)) .discardingFiredPanes()) - // TODO etienne: unnecessary groupByKey? because aggregators are shared in parallel - // and Pardo is also in parallel, why group all elements in memory of the same executor? + // this GroupByKey allows to have one file per window .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create()) .apply(name + ".Index", ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() { http://git-wip-us.apache.org/repos/asf/beam/blob/7bfc982c/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java index 4da99eb..a8a61ae 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java @@ -48,29 +48,30 @@ public class Query11 extends NexmarkQuery { } private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) { - return events.apply(JUST_BIDS) - .apply(name + ".Rekey", - // TODO etienne: why not avoid this ParDo and do a Cont.perElement? - ParDo.of(new DoFn<Bid, KV<Long, Void>>() { - @ProcessElement - public void processElement(ProcessContext c) { - Bid bid = c.element(); - c.output(KV.of(bid.bidder, (Void) null)); - } - })) - .apply(Window.<KV<Long, Void>>into( - Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec))) - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))) - .discardingFiredPanes() - .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2))) - .apply(Count.<Long, Void>perKey()) - .apply(name + ".ToResult", - ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(new BidsPerSession(c.element().getKey(), c.element().getValue())); - } - })); + PCollection<Long> bidders = events.apply(JUST_BIDS).apply(name + ".Rekey", + ParDo.of(new DoFn<Bid, Long>() { + + @ProcessElement public void processElement(ProcessContext c) { + Bid bid = c.element(); + c.output(bid.bidder); + } + })); + + PCollection<Long> biddersWindowed = bidders.apply( + Window.<Long>into( + Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec))) + .triggering( + Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))) + .discardingFiredPanes() + .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2))); + PCollection<BidsPerSession> bidsPerSession = biddersWindowed.apply(Count.<Long>perElement()) + .apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() { + + @ProcessElement public void processElement(ProcessContext c) { + c.output(new BidsPerSession(c.element().getKey(), c.element().getValue())); + } + })); + return bidsPerSession; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/7bfc982c/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java index 9f02ddb..34b7b50 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java @@ -75,6 +75,8 @@ public class Query5 extends NexmarkQuery { // We'll want to keep all auctions with the maximal number of bids. // Start by lifting each into a singleton list. + // need to do so because bellow combine returns a list of auctions in the key in case of + // equal number of bids. Combine needs to have same input type and return type. .apply(name + ".ToSingletons", ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() { @ProcessElement
