Improve query5, query10 and query11

query5: Add comment on key lifting (issue #30)

query10: Add comment for strange groupByKey (issue #31)

query11: Replace Count.perKey by Count.perElement (issue #32)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7bfc982c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7bfc982c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7bfc982c

Branch: refs/heads/master
Commit: 7bfc982c77de52f49ba1b304a81bb0d53de5f44a
Parents: a7f9f7d
Author: Etienne Chauchot <[email protected]>
Authored: Fri Mar 24 14:29:08 2017 +0100
Committer: Ismaël Mejía <[email protected]>
Committed: Wed Aug 23 19:07:27 2017 +0200

----------------------------------------------------------------------
 .../integration/nexmark/queries/Query10.java    |  3 +-
 .../integration/nexmark/queries/Query11.java    | 47 ++++++++++----------
 .../integration/nexmark/queries/Query5.java     |  2 +
 3 files changed, 27 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7bfc982c/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git 
a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
 
b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
index 6912ed1..5246427 100644
--- 
a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
+++ 
b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -322,8 +322,7 @@ public class Query10 extends NexmarkQuery {
             // We expect no late data here, but we'll assume the worst so we 
can detect any.
             .withAllowedLateness(Duration.standardDays(1))
             .discardingFiredPanes())
-      // TODO etienne: unnecessary groupByKey? because aggregators are shared 
in parallel
-      // and Pardo is also in parallel, why group all elements in memory of 
the same executor?
+      // this GroupByKey allows to have one file per window
       .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create())
         .apply(name + ".Index",
             ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {

http://git-wip-us.apache.org/repos/asf/beam/blob/7bfc982c/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
----------------------------------------------------------------------
diff --git 
a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
 
b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
index 4da99eb..a8a61ae 100644
--- 
a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
+++ 
b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
@@ -48,29 +48,30 @@ public class Query11 extends NexmarkQuery {
   }
 
   private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
-    return events.apply(JUST_BIDS)
-        .apply(name + ".Rekey",
-          // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
-          ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    Bid bid = c.element();
-                    c.output(KV.of(bid.bidder, (Void) null));
-                  }
-                }))
-        .apply(Window.<KV<Long, Void>>into(
-            
Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
-        
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
-        .discardingFiredPanes()
-        
.withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec 
/ 2)))
-        .apply(Count.<Long, Void>perKey())
-        .apply(name + ".ToResult",
-            ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(new BidsPerSession(c.element().getKey(), 
c.element().getValue()));
-                  }
-                }));
+    PCollection<Long> bidders = events.apply(JUST_BIDS).apply(name + ".Rekey",
+        ParDo.of(new DoFn<Bid, Long>() {
+
+          @ProcessElement public void processElement(ProcessContext c) {
+            Bid bid = c.element();
+            c.output(bid.bidder);
+          }
+        }));
+
+    PCollection<Long> biddersWindowed = bidders.apply(
+        Window.<Long>into(
+          
Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
+            .triggering(
+                
Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
+            .discardingFiredPanes()
+            
.withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec 
/ 2)));
+    PCollection<BidsPerSession> bidsPerSession = 
biddersWindowed.apply(Count.<Long>perElement())
+        .apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, 
BidsPerSession>() {
+
+          @ProcessElement public void processElement(ProcessContext c) {
+            c.output(new BidsPerSession(c.element().getKey(), 
c.element().getValue()));
+          }
+        }));
+    return bidsPerSession;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/7bfc982c/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
----------------------------------------------------------------------
diff --git 
a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
 
b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
index 9f02ddb..34b7b50 100644
--- 
a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
+++ 
b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
@@ -75,6 +75,8 @@ public class Query5 extends NexmarkQuery {
 
       // We'll want to keep all auctions with the maximal number of bids.
         // Start by lifting each into a singleton list.
+        // need to do so because bellow combine returns a list of auctions in 
the key in case of
+        // equal number of bids. Combine needs to have same input type and 
return type.
         .apply(name + ".ToSingletons",
             ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
                   @ProcessElement

Reply via email to